You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/20 00:47:45 UTC
[1/2] kafka git commit: Cherry-pick KAFKA-3941: Delay eviction
listener in InMemoryKeyValueLoggedStore after restoration
Repository: kafka
Updated Branches:
refs/heads/0.10.0 0bb1d3ae5 -> a7f9396b0
Cherry-pick KAFKA-3941: Delay eviction listener in InMemoryKeyValueLoggedStore after restoration
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7bd06e92
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7bd06e92
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7bd06e92
Branch: refs/heads/0.10.0
Commit: 7bd06e9255544c3b9c536b6b9417e4bb52c830b3
Parents: 4091a13
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jul 13 18:11:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 19 17:46:50 2016 -0700
----------------------------------------------------------------------
.../internals/InMemoryKeyValueLoggedStore.java | 27 ++++++++-------
.../InMemoryKeyValueStoreSupplier.java | 26 +++++++++++++--
.../InMemoryLRUCacheStoreSupplier.java | 15 +++------
.../streams/state/internals/MemoryLRUCache.java | 35 +++++++++++++++++---
.../internals/MemoryNavigableLRUCache.java | 10 +++---
.../internals/RocksDBKeyValueStoreSupplier.java | 4 ++-
.../streams/state/internals/RocksDBStore.java | 2 +-
.../internals/RocksDBWindowStoreSupplier.java | 4 ++-
.../processor/internals/StandbyTaskTest.java | 4 ---
9 files changed, 84 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index efcdac7..2c45d1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +34,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
private final Serde<V> valueSerde;
private final String storeName;
- private StateSerdes<K, V> serdes;
private StoreChangeLogger<K, V> changeLogger;
private StoreChangeLogger.ValueGetter<K, V> getter;
@@ -54,30 +52,31 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context, StateStore root) {
+ inner.init(context, root);
+
// construct the serde
- this.serdes = new StateSerdes<>(storeName,
+ StateSerdes<K, V> serdes = new StateSerdes<>(storeName,
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
- context.register(root, true, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
-
- // directly call inner functions so that the operation is not logged
- inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
- }
- });
-
- inner.init(context, root);
-
this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
@Override
public V get(K key) {
return inner.get(key);
}
};
+
+ // if the inner store is an LRU cache, add the eviction listener to log removed record
+ if (inner instanceof MemoryLRUCache) {
+ ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ removed(key);
+ }
+ });
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index a25153c..8b498a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
import java.util.Iterator;
import java.util.List;
@@ -67,7 +69,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
+ MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
+
+ return new MeteredKeyValueStore<>(store.enableLogging(), "in-memory-state", time);
}
private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -76,6 +80,8 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final Serde<V> valueSerde;
private final NavigableMap<K, V> map;
+ private StateSerdes<K, V> serdes;
+
public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
this.name = name;
this.keySerde = keySerde;
@@ -98,7 +104,23 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context, StateStore root) {
- // do nothing
+ // construct the serde
+ this.serdes = new StateSerdes<>(name,
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+ // register the store
+ context.register(root, true, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ // check value for null, to avoid deserialization error.
+ if (value == null) {
+ put(serdes.keyFrom(key), null);
+ } else {
+ put(serdes.keyFrom(key), serdes.valueFrom(value));
+ }
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 4a4fa5f..20a7333 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -52,17 +52,10 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
return name;
}
- @SuppressWarnings("unchecked")
public StateStore get() {
- final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
- final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(keySerde, valueSerde);
- final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
- cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- loggedCache.removed(key);
- }
- });
- return store;
+ MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
+ InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore<K, V>) cache.enableLogging();
+
+ return new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index d410e02..1899389 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -48,16 +50,25 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
void apply(K key, V value);
}
+ private final Serde<K> keySerde;
+ private final Serde<V> valueSerde;
+
protected String name;
protected Map<K, V> map;
protected Set<K> keys;
+ private StateSerdes<K, V> serdes;
protected EldestEntryRemovalListener<K, V> listener;
// this is used for extended MemoryNavigableLRUCache only
- public MemoryLRUCache() {}
+ public MemoryLRUCache(Serde<K> keySerde, Serde<V> valueSerde) {
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+ this(keySerde, valueSerde);
- public MemoryLRUCache(String name, final int maxCacheSize) {
this.name = name;
this.keys = new HashSet<>();
@@ -78,7 +89,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
};
}
- public KeyValueStore<K, V> enableLogging(Serde<K> keySerde, Serde<V> valueSerde) {
+ public KeyValueStore<K, V> enableLogging() {
return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
}
@@ -96,7 +107,23 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context, StateStore root) {
- // do nothing
+ // construct the serde
+ this.serdes = new StateSerdes<>(name,
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+ // register the store
+ context.register(root, true, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ // check value for null, to avoid deserialization error.
+ if (value == null) {
+ put(serdes.keyFrom(key), null);
+ } else {
+ put(serdes.keyFrom(key), serdes.valueFrom(value));
+ }
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 99bac93..5eb4f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -27,8 +28,8 @@ import java.util.TreeSet;
public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
- public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
- super();
+ public MemoryNavigableLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+ super(keySerde, valueSerde);
this.name = name;
this.keys = new TreeSet<>();
@@ -57,15 +58,14 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
return this;
}
- @SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from, true, to, false).iterator(), this.map);
+ return new MemoryNavigableLRUCache.CacheIterator<>(((NavigableSet<K>) this.keys).subSet(from, true, to, false).iterator(), this.map);
}
@Override
public KeyValueIterator<K, V> all() {
- return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ return new MemoryNavigableLRUCache.CacheIterator<>(this.keys.iterator(), this.map);
}
private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index af98733..16111ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -53,6 +53,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde).enableLogging(), "rocksdb-state", time);
+ RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde);
+
+ return new MeteredKeyValueStore<>(store.enableLogging(), "rocksdb-state", time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8f3bab0..73110f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -173,7 +173,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
if (this.cacheSize > 0) {
- this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
+ this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize, null, null)
.whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
@Override
public void apply(K key, RocksDBCacheEntry entry) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 0407299..3a1bd59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -59,7 +59,9 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde).enableLogging(), "rocksdb-window", time);
+ RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
+
+ return new MeteredWindowStore<>(store.enableLogging(), "rocksdb-window", time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e7fb9a4..9e15e1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -115,15 +115,11 @@ public class StandbyTaskTest {
new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
));
- System.out.println("added " + storeChangelogTopicName1);
-
restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
));
-
- System.out.println("added " + storeChangelogTopicName2);
}
@Test
[2/2] kafka git commit: Merge branch '0.10.0' of
https://git-wip-us.apache.org/repos/asf/kafka into 0.10.0
Posted by gu...@apache.org.
Merge branch '0.10.0' of https://git-wip-us.apache.org/repos/asf/kafka into 0.10.0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7f9396b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7f9396b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7f9396b
Branch: refs/heads/0.10.0
Commit: a7f9396b07a8be2e2e7d43a8ac31f39c66a0f8da
Parents: 7bd06e9 0bb1d3a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Jul 19 17:47:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 19 17:47:17 2016 -0700
----------------------------------------------------------------------
build.gradle | 4 +-
.../kafka/clients/consumer/KafkaConsumer.java | 1 +
.../consumer/internals/ConsumerCoordinator.java | 12 ++-
.../consumer/internals/SubscriptionState.java | 41 +++++++---
.../kafka/clients/producer/ProducerConfig.java | 16 ++--
.../main/scala/kafka/log/FileMessageSet.scala | 20 +++--
.../kafka/tools/VerifyConsumerRebalance.scala | 3 +-
.../kafka/api/PlaintextConsumerTest.scala | 83 +++++++++++++++++++-
.../unit/kafka/log/FileMessageSetTest.scala | 67 +++++++++++++++-
docs/ops.html | 22 ++++--
docs/quickstart.html | 29 +++++--
.../apache/kafka/streams/kstream/Windows.java | 4 +-
.../kstream/internals/AbstractStream.java | 6 ++
.../streams/kstream/internals/KTableFilter.java | 3 +
.../kstream/internals/KTableRepartitionMap.java | 5 +-
.../streams/processor/TopologyBuilder.java | 38 ++++-----
.../processor/internals/RecordCollector.java | 29 ++++---
.../streams/state/internals/RocksDBStore.java | 21 +++--
.../kstream/internals/KTableFilterTest.java | 39 ++++++++-
.../streams/smoketest/SmokeTestDriver.java | 4 +-
.../kafka/streams/smoketest/SmokeTestUtil.java | 2 +-
.../kafka/test/MockProcessorSupplier.java | 6 ++
tests/kafkatest/services/kafka/kafka.py | 13 +--
.../services/kafka/templates/kafka.properties | 4 +-
tests/kafkatest/services/security/minikdc.py | 2 +-
.../services/security/security_config.py | 40 ++++++----
tests/kafkatest/services/zookeeper.py | 3 +-
.../core/zookeeper_security_upgrade_test.py | 2 +-
28 files changed, 400 insertions(+), 119 deletions(-)
----------------------------------------------------------------------