You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/06 21:14:37 UTC
[kafka] branch trunk updated: KAFKA-4750: Bypass null value and
treat it as deletes (#4508)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a88db95 KAFKA-4750: Bypass null value and treat it as deletes (#4508)
a88db95 is described below
commit a88db9595910a1a7f249f7c9f3effb14636955c7
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 6 13:14:33 2018 -0800
KAFKA-4750: Bypass null value and treat it as deletes (#4508)
Here is the new rule for handling nulls:
* in the interface store, put(key, null) are handled normally and value serde will still be applied to null, hence needs to handle null values
* in the inner bytes store, null bytes after serialization will be treated as deletes.
* in the interface store, if null bytes get returned in get(key), it indicate the key is not available; and hence serde will be avoided and null object will be returned.
More changes:
* Update javadocs, add unit tests accordingly; augment MockContext to set serdes for the newly added tests.
* Fixed a discovered bug which is exposed by the newly added tests.
* Use the new API to remove all old APIs in the existing state store tests.
* Remove SerializedKeyValueIterator since it is not used any more.
This is originally contributed by @evis.
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@confluent.io>
---
.../apache/kafka/streams/kstream/Materialized.java | 3 +-
.../streams/state/KeyValueBytesStoreSupplier.java | 5 ++
.../apache/kafka/streams/state/KeyValueStore.java | 9 ++-
.../streams/state/SessionBytesStoreSupplier.java | 7 +-
.../apache/kafka/streams/state/SessionStore.java | 9 ++-
.../org/apache/kafka/streams/state/Stores.java | 9 ++-
.../streams/state/WindowBytesStoreSupplier.java | 7 +-
.../apache/kafka/streams/state/WindowStore.java | 5 +-
.../state/internals/CachingKeyValueStore.java | 30 ++++---
.../state/internals/CachingSessionStore.java | 5 +-
.../state/internals/CachingWindowStore.java | 3 +-
.../internals/ChangeLoggingKeyValueBytesStore.java | 11 +++
.../state/internals/InMemoryKeyValueStore.java | 22 ++---
.../state/internals/InnerMeteredKeyValueStore.java | 6 +-
.../streams/state/internals/MemoryLRUCache.java | 33 +++++---
.../state/internals/MemoryNavigableLRUCache.java | 7 --
.../state/internals/MeteredKeyValueBytesStore.java | 10 +--
.../state/internals/MeteredSessionStore.java | 14 ++--
.../streams/state/internals/RocksDBStore.java | 12 +--
.../state/internals/SegmentedBytesStore.java | 4 +-
.../internals/SerializedKeyValueIterator.java | 70 ----------------
.../internals/WindowStoreIteratorWrapper.java | 5 +-
.../internals/WrappedSessionStoreIterator.java | 3 +-
.../kstream/internals/KTableFilterTest.java | 18 +++--
.../state/internals/AbstractKeyValueStoreTest.java | 75 +++++++++++++++--
.../state/internals/CachingKeyValueStoreTest.java | 26 +++---
.../internals/InMemoryKeyValueLoggedStoreTest.java | 23 +-----
.../state/internals/InMemoryKeyValueStoreTest.java | 27 +++----
.../state/internals/InMemoryLRUCacheStoreTest.java | 29 +++----
.../state/internals/RocksDBKeyValueStoreTest.java | 31 +++----
.../internals/SerializedKeyValueIteratorTest.java | 94 ----------------------
.../apache/kafka/test/MockProcessorContext.java | 12 ++-
32 files changed, 269 insertions(+), 355 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 48dd12e..89603fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -160,7 +160,8 @@ public class Materialized<K, V, S extends StateStore> {
* Set the valueSerde the materialized {@link StateStore} will use.
*
* @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
- * serde from configs will be used
+ * serde from configs will be used. If the serialized bytes is null for put operations,
+ * it is treated as delete operation
* @return itself
*/
public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
index 73e6732..b12e94e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -20,6 +20,11 @@ import org.apache.kafka.common.utils.Bytes;
/**
* A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ *
+ * For any stores implementing the {@link KeyValueStore KeyValueStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. If the key does not exist, get operations should return null value bytes.
*/
public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 129ae6b..3685229 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -33,7 +33,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* Update the value associated with this key
*
* @param key The key to associate the value to
- * @param value The value, it can be null.
+ * @param value The value to update, it can be null;
+ * if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
*/
void put(K key, V value);
@@ -43,7 +44,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* is already associated with the key
*
* @param key The key to associate the value to
- * @param value The value, it can be null
+ * @param value The value to update, it can be null;
+ * if the serialized bytes are also null it is interpreted as deletes
* @return The old value or null if there is no such key.
* @throws NullPointerException If null is used for key.
*/
@@ -52,7 +54,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
/**
* Update all the given key/value pairs
*
- * @param entries A list of entries to put into the store.
+ * @param entries A list of entries to put into the store;
+ * if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
*/
void putAll(List<KeyValue<K, V>> entries);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index e5523da..04b0ceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -19,7 +19,12 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
- * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>>} instances of type <Byte, byte[]>.
+ * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ *
+ * For any stores implementing the {@link SessionStore SessionStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. Null value bytes should never be returned in range query results.
*/
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index fcbce5f..c1a6993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -33,7 +33,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
* This iterator must be closed after use.
*
* @param key the key to return sessions for
- * @param earliestSessionEndTime
+ * @param earliestSessionEndTime the end timestamp of the earliest session to search for
+ * @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching key and aggregated values
* @throws NullPointerException If null is used for key.
*/
@@ -47,7 +48,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
- * @param earliestSessionEndTime
+ * @param earliestSessionEndTime the end timestamp of the earliest session to search for
+ * @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching keys and aggregated values
* @throws NullPointerException If null is used for any key.
*/
@@ -63,7 +65,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
/**
* Write the aggregated value for the provided key to the store
* @param sessionKey key of the session to write
- * @param aggregate the aggregated value for the session
+ * @param aggregate the aggregated value for the session, it can be null;
+ * if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for sessionKey.
*/
void put(final Windowed<K> sessionKey, final AGG aggregate);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 0ce6d9e..0aaa2b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -198,7 +198,8 @@ public class Stores {
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
@@ -214,7 +215,8 @@ public class Stores {
* Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
* @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
@@ -230,7 +232,8 @@ public class Stores {
* Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
* @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index 5fbe6b0..9cf70c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -19,7 +19,12 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
- * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>>} instances of type <Byte, byte[]>.
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ *
+ * For any stores implementing the {@link WindowStore WindowStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. Null value bytes should never be returned in range query results.
*/
public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 6a4d5f6..d2f6ad8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -30,7 +30,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* Put a key-value pair with the current wall-clock time as the timestamp
* into the corresponding window
* @param key The key to associate the value to
- * @param value The value, it can be null
+ * @param value The value to update, it can be null;
+ * if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
*/
void put(K key, V value);
@@ -38,7 +39,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
/**
* Put a key-value pair with the given timestamp into the corresponding window
* @param key The key to associate the value to
- * @param value The value, it can be null
+ * @param value The value; can be null
* @throws NullPointerException If null is used for key.
*/
void put(K key, V value, long timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f9ab3f1..82a6ac7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -91,7 +91,12 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
try {
context.setRecordContext(entry.recordContext());
if (flushListener != null) {
- final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
+ V oldValue = null;
+ if (sendOldValues) {
+ final byte[] oldBytesValue = underlying.get(entry.key());
+ oldValue = oldBytesValue == null ? null : serdes.valueFrom(oldBytesValue);
+ }
+ // we rely on underlying store to handle null new value bytes as deletes
underlying.put(entry.key(), entry.newValue());
flushListener.apply(serdes.keyFrom(entry.key().get()),
serdes.valueFrom(entry.newValue()),
@@ -141,6 +146,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public byte[] get(final Bytes key) {
+ Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
Lock theLock;
if (Thread.currentThread().equals(streamThread)) {
@@ -150,7 +156,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
theLock.lock();
try {
- Objects.requireNonNull(key);
return getInternal(key);
} finally {
theLock.unlock();
@@ -212,15 +217,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
validateStoreOpen();
lock.writeLock().lock();
try {
+ // for null bytes, we still put it into cache indicating tombstones
putInternal(key, value);
} finally {
lock.writeLock().unlock();
}
}
- private void putInternal(final Bytes rawKey, final byte[] value) {
- Objects.requireNonNull(rawKey, "key cannot be null");
- cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(),
+ private void putInternal(final Bytes key, final byte[] value) {
+ cache.put(cacheName, key, new LRUCacheEntry(value, true, context.offset(),
context.timestamp(), context.partition(), context.topic()));
}
@@ -242,9 +247,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ validateStoreOpen();
lock.writeLock().lock();
try {
for (KeyValue<Bytes, byte[]> entry : entries) {
+ Objects.requireNonNull(entry.key, "key cannot be null");
put(entry.key, entry.value);
}
} finally {
@@ -254,19 +261,22 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public byte[] delete(final Bytes key) {
+ Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
lock.writeLock().lock();
try {
- Objects.requireNonNull(key);
- final byte[] v = getInternal(key);
- cache.delete(cacheName, key);
- underlying.delete(key);
- return v;
+ return deleteInternal(key);
} finally {
lock.writeLock().unlock();
}
}
+ private byte[] deleteInternal(final Bytes key) {
+ final byte[] v = getInternal(key);
+ putInternal(key, null);
+ return v;
+ }
+
KeyValueStore<Bytes, byte[]> underlying() {
return underlying;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 05851e5..31b9d75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -160,8 +160,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
return findSessions(from, to, 0, Long.MAX_VALUE);
}
-
-
private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
final Bytes binaryKey = cacheFunction.key(entry.key());
final RecordContext current = context.recordContext();
@@ -183,8 +181,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
}
private AGG fetchPrevious(final Bytes rawKey, final Window window) {
- try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
- .findSessions(rawKey, window.start(), window.end())) {
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) {
if (!iterator.hasNext()) {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 75acd77..ad0bd99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -112,8 +112,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
context.setRecordContext(entry.recordContext());
try {
final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
- flushListener.apply(windowedKey,
- serdes.valueFrom(entry.newValue()), oldValue);
+ flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue);
} finally {
context.setRecordContext(current);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 94ee275..0fdd3e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -45,6 +45,17 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
ProcessorStateManager.storeChangelogTopic(
context.applicationId(),
inner.name())));
+
+ // if the inner store is an LRU cache, add the eviction listener to log removed record
+ if (inner instanceof MemoryLRUCache) {
+ ((MemoryLRUCache<Bytes, byte[]>) inner).whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<Bytes, byte[]>() {
+ @Override
+ public void apply(Bytes key, byte[] value) {
+ // pass null to indicate removal
+ changeLogger.logChange(key, null);
+ }
+ });
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 929d584..2cdfc4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -63,7 +63,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
- public void init(ProcessorContext context, StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root) {
// construct the serde
this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
@@ -99,17 +99,21 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public synchronized V get(K key) {
+ public synchronized V get(final K key) {
return this.map.get(key);
}
@Override
- public synchronized void put(K key, V value) {
- this.map.put(key, value);
+ public synchronized void put(final K key, final V value) {
+ if (value == null) {
+ this.map.remove(key);
+ } else {
+ this.map.put(key, value);
+ }
}
@Override
- public synchronized V putIfAbsent(K key, V value) {
+ public synchronized V putIfAbsent(final K key, final V value) {
V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@@ -118,18 +122,18 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public synchronized void putAll(List<KeyValue<K, V>> entries) {
+ public synchronized void putAll(final List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
}
@Override
- public synchronized V delete(K key) {
+ public synchronized V delete(final K key) {
return this.map.remove(key);
}
@Override
- public synchronized KeyValueIterator<K, V> range(K from, K to) {
+ public synchronized KeyValueIterator<K, V> range(final K from, final K to) {
return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
}
@@ -158,7 +162,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
private final Iterator<Map.Entry<K, V>> iter;
- private InMemoryKeyValueIterator(Iterator<Map.Entry<K, V>> iter) {
+ private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter) {
this.iter = iter;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index d076a05..40e2d43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -75,9 +75,9 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
// always wrap the store with the metered store
InnerMeteredKeyValueStore(final KeyValueStore<IK, IV> inner,
- final String metricScope,
- final TypeConverter<K, IK, V, IV> typeConverter,
- final Time time) {
+ final String metricScope,
+ final TypeConverter<K, IK, V, IV> typeConverter,
+ final Time time) {
super(inner);
this.inner = inner;
this.metricScope = metricScope;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index f1aa96f..2785540 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -61,9 +61,12 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
// in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
private volatile boolean open = true;
- EldestEntryRemovalListener<K, V> listener;
+ private EldestEntryRemovalListener<K, V> listener;
- MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+ MemoryLRUCache(final String name,
+ final int maxCacheSize,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
@@ -87,7 +90,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
}
- MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ MemoryLRUCache<K, V> whenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
this.listener = listener;
return this;
@@ -114,7 +117,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
restoring = true;
// check value for null, to avoid deserialization error.
if (value == null) {
- put(serdes.keyFrom(key), null);
+ delete(serdes.keyFrom(key));
} else {
put(serdes.keyFrom(key), serdes.valueFrom(value));
}
@@ -134,19 +137,24 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
- public synchronized V get(K key) {
+ public synchronized V get(final K key) {
Objects.requireNonNull(key);
+
return this.map.get(key);
}
@Override
- public synchronized void put(K key, V value) {
+ public synchronized void put(final K key, final V value) {
Objects.requireNonNull(key);
- this.map.put(key, value);
+ if (value == null) {
+ this.map.remove(key);
+ } else {
+ this.map.put(key, value);
+ }
}
@Override
- public synchronized V putIfAbsent(K key, V value) {
+ public synchronized V putIfAbsent(final K key, final V value) {
Objects.requireNonNull(key);
V originalValue = get(key);
if (originalValue == null) {
@@ -156,23 +164,22 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
- public void putAll(List<KeyValue<K, V>> entries) {
+ public void putAll(final List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
}
@Override
- public synchronized V delete(K key) {
+ public synchronized V delete(final K key) {
Objects.requireNonNull(key);
- V value = this.map.remove(key);
- return value;
+ return this.map.remove(key);
}
/**
* @throws UnsupportedOperationException
*/
@Override
- public KeyValueIterator<K, V> range(K from, K to) {
+ public KeyValueIterator<K, V> range(final K from, final K to) {
throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 268042d..9500e8e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -32,13 +32,6 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
}
@Override
- public MemoryNavigableLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
-
- return this;
- }
-
- @Override
public KeyValueIterator<K, V> range(K from, K to) {
final TreeMap<K, V> treeMap = toTreeMap();
return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index 35647b7..3a30c10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -62,9 +62,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
@Override
public byte[] innerValue(final V value) {
- if (value == null) {
- return null;
- }
+ // do not check on null, but rely on user serde to handle it
return serdes.rawValue(value);
}
@@ -80,12 +78,12 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
@Override
public V outerValue(final byte[] value) {
- return serdes.valueFrom(value);
+ return value == null ? null : serdes.valueFrom(value);
}
@Override
- public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> from) {
- return KeyValue.pair(serdes.keyFrom(from.key.get()), serdes.valueFrom(from.value));
+ public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> keyValue) {
+ return KeyValue.pair(serdes.keyFrom(keyValue.key.get()), keyValue.value == null ? null : serdes.valueFrom(keyValue.value));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 8d9065c..1a9ac20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -92,7 +92,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
final long earliestSessionEndTime,
final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
- final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
+ final Bytes bytesKey = keyBytes(key);
return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKey,
earliestSessionEndTime,
latestSessionStartTime),
@@ -109,8 +109,8 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
final long latestSessionStartTime) {
Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
Objects.requireNonNull(keyTo, "keyTo cannot be null");
- final Bytes bytesKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom));
- final Bytes bytesKeyTo = Bytes.wrap(serdes.rawKey(keyTo));
+ final Bytes bytesKeyFrom = keyBytes(keyFrom);
+ final Bytes bytesKeyTo = keyBytes(keyTo);
return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKeyFrom,
bytesKeyTo,
earliestSessionEndTime,
@@ -126,7 +126,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
final long startNs = time.nanoseconds();
try {
- final Bytes key = Bytes.wrap(serdes.rawKey(sessionKey.key()));
+ final Bytes key = keyBytes(sessionKey.key());
inner.remove(new Windowed<>(key, sessionKey.window()));
} finally {
this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
@@ -138,13 +138,17 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
long startNs = time.nanoseconds();
try {
- final Bytes key = Bytes.wrap(serdes.rawKey(sessionKey.key()));
+ final Bytes key = keyBytes(sessionKey.key());
this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
+ private Bytes keyBytes(final K key) {
+ return Bytes.wrap(serdes.rawKey(key));
+ }
+
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 4c7e61b..67ec915 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -229,11 +229,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
public synchronized V get(K key) {
validateStoreOpen();
byte[] byteValue = getInternal(serdes.rawKey(key));
- if (byteValue == null) {
- return null;
- } else {
- return serdes.valueFrom(byteValue);
- }
+ return byteValue == null ? null : serdes.valueFrom(byteValue);
}
private void validateStoreOpen() {
@@ -341,11 +337,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
for (KeyValue<K, V> entry : entries) {
Objects.requireNonNull(entry.key, "key cannot be null");
final byte[] rawKey = serdes.rawKey(entry.key);
- if (entry.value == null) {
+ final byte[] rawValue = serdes.rawValue(entry.value);
+ if (rawValue == null) {
batch.remove(rawKey);
} else {
- final byte[] value = serdes.rawValue(entry.value);
- batch.put(rawKey, value);
+ batch.put(rawKey, rawValue);
}
}
db.write(wOptions, batch);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 19cf319..a533185 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -62,8 +62,8 @@ public interface SegmentedBytesStore extends StateStore {
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
- * @param timeFrom the beginning of the time slot from which to search
- * @param timeTo the end of the time slot from which to search
+ * @param from the beginning of the time slot from which to search
+ * @param to the end of the time slot from which to search
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if null is used for any key
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
deleted file mode 100644
index 2c050f5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.NoSuchElementException;
-
-class SerializedKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
-
- private final KeyValueIterator<Bytes, byte[]> bytesIterator;
- private final StateSerdes<K, V> serdes;
-
- SerializedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator,
- final StateSerdes<K, V> serdes) {
-
- this.bytesIterator = bytesIterator;
- this.serdes = serdes;
- }
-
- @Override
- public void close() {
- bytesIterator.close();
- }
-
- @Override
- public K peekNextKey() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- final Bytes bytes = bytesIterator.peekNextKey();
- return serdes.keyFrom(bytes.get());
- }
-
- @Override
- public boolean hasNext() {
- return bytesIterator.hasNext();
- }
-
- @Override
- public KeyValue<K, V> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value));
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
index 4fd6f3e..4cb85d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -149,8 +149,9 @@ class WindowStoreIteratorWrapper<K, V> {
final StateSerdes<K, V> serdes;
final long windowSize;
- WrappedKeyValueIterator(
- KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<K, V> serdes, long windowSize) {
+ WrappedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator,
+ final StateSerdes<K, V> serdes,
+ final long windowSize) {
this.bytesIterator = bytesIterator;
this.serdes = serdes;
this.windowSize = windowSize;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index 6fd9636..c5ea70b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -77,7 +77,8 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>,
@Override
public KeyValue<Windowed<K>, V> next() {
final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
+ return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()),
+ serdes.valueFrom(next.value));
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 01236ad..657e05d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -53,8 +53,10 @@ public class KTableFilterTest {
stateDir = TestUtils.tempDirectory("kafka-test");
}
- private void doTestKTable(final StreamsBuilder builder, final KTable<String, Integer> table2,
- final KTable<String, Integer> table3, final String topic1) {
+ private void doTestKTable(final StreamsBuilder builder,
+ final KTable<String, Integer> table2,
+ final KTable<String, Integer> table3,
+ final String topic) {
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
@@ -62,13 +64,13 @@ public class KTableFilterTest {
driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 2);
- driver.process(topic1, "C", 3);
- driver.process(topic1, "D", 4);
+ driver.process(topic, "A", 1);
+ driver.process(topic, "B", 2);
+ driver.process(topic, "C", 3);
+ driver.process(topic, "D", 4);
driver.flushState();
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ driver.process(topic, "A", null);
+ driver.process(topic, "B", null);
driver.flushState();
proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 398c4c5..0583e91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -32,12 +36,11 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public abstract class AbstractKeyValueStoreTest {
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
- Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes);
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
protected MockProcessorContext context;
protected KeyValueStore<Integer, String> store;
@@ -48,7 +51,7 @@ public abstract class AbstractKeyValueStoreTest {
driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
context = (MockProcessorContext) driver.context();
context.setTime(10);
- store = createKeyValueStore(context, Integer.class, String.class, false);
+ store = createKeyValueStore(context);
}
@After
@@ -68,6 +71,66 @@ public abstract class AbstractKeyValueStoreTest {
}
@Test
+ public void shouldNotIncludeDeletedFromRangeResult() {
+ store.close();
+
+ final Serializer<String> serializer = new StringSerializer() {
+ private int numCalls = 0;
+
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ if (++numCalls > 3) {
+ fail("Value serializer is called; it should never happen");
+ }
+
+ return super.serialize(topic, data);
+ }
+ };
+
+ context.setValueSerde(Serdes.serdeFrom(serializer, new StringDeserializer()));
+ store = createKeyValueStore(driver.context());
+
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.delete(0);
+ store.delete(1);
+
+ // should not include deleted records in iterator
+ final Map<Integer, String> expectedContents = Collections.singletonMap(2, "two");
+ assertEquals(expectedContents, getContents(store.all()));
+ }
+
+ @Test
+ public void shouldDeleteIfSerializedValueIsNull() {
+ store.close();
+
+ final Serializer<String> serializer = new StringSerializer() {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ if (data.equals("null")) {
+ // will be serialized to null bytes, indicating deletes
+ return null;
+ }
+ return super.serialize(topic, data);
+ }
+ };
+
+ context.setValueSerde(Serdes.serdeFrom(serializer, new StringDeserializer()));
+ store = createKeyValueStore(driver.context());
+
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(0, "null");
+ store.put(1, "null");
+
+ // should not include deleted records in iterator
+ final Map<Integer, String> expectedContents = Collections.singletonMap(2, "two");
+ assertEquals(expectedContents, getContents(store.all()));
+ }
+
+ @Test
public void testPutGetRange() {
// Verify that the store reads and writes correctly ...
store.put(0, "zero");
@@ -157,7 +220,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ store = createKeyValueStore(driver.context());
context.restore(store.name(), driver.restoredEntries());
// Verify that the store's contents were properly restored ...
@@ -179,7 +242,7 @@ public abstract class AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ store = createKeyValueStore(driver.context());
context.restore(store.name(), driver.restoredEntries());
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 97a2fbf..3ff343a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.After;
@@ -84,22 +86,14 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
- final Class<K> keyClass,
- final Class<V> valueClass,
- final boolean useContextSerdes) {
- final String storeName = "cache-store";
-
-
- final Stores.PersistentKeyValueFactory<K, V> factory = Stores
- .create(storeName)
- .withKeys(Serdes.serdeFrom(keyClass))
- .withValues(Serdes.serdeFrom(valueClass))
- .persistent()
- .enableCaching();
-
-
- final KeyValueStore<K, V> store = (KeyValueStore<K, V>) factory.build().get();
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("cache-store"),
+ (Serde<K>) context.keySerde(),
+ (Serde<V>) context.valueSerde())
+ .withCachingEnabled();
+
+ final KeyValueStore<K, V> store = (KeyValueStore<K, V>) storeBuilder.build();
final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index d24a90f..adaab00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -36,27 +35,11 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- ProcessorContext context,
- Class<K> keyClass,
- Class<V> valueClass,
- boolean useContextSerdes) {
-
- final Serde<K> keySerde;
- final Serde<V> valueSerde;
-
- if (useContextSerdes) {
- keySerde = (Serde<K>) context.keySerde();
- valueSerde = (Serde<V>) context.valueSerde();
- } else {
- keySerde = Serdes.serdeFrom(keyClass);
- valueSerde = Serdes.serdeFrom(valueClass);
- }
-
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
- keySerde,
- valueSerde)
+ (Serde<K>) context.keySerde(),
+ (Serde<V>) context.valueSerde())
.withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
final StateStore store = storeBuilder.build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 541c003..ef5d6dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
@@ -30,22 +32,15 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- ProcessorContext context,
- Class<K> keyClass,
- Class<V> valueClass,
- boolean useContextSerdes) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("my-store"),
+ (Serde<K>) context.keySerde(),
+ (Serde<V>) context.valueSerde());
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build();
- } else {
- supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
- }
-
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ final StateStore store = storeBuilder.build();
store.init(context, store);
- return store;
+ return (KeyValueStore<K, V>) store;
}
@Test
@@ -59,7 +54,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
driver.addEntryToRestoreLog(3, "three");
driver.addEntryToRestoreLog(0, null);
- store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ store = createKeyValueStore(driver.context());
context.restore(store.name(), driver.restoredEntries());
assertEquals(3, driver.sizeOf(store));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 7dda585..2d39ae7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
@@ -36,22 +38,17 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- ProcessorContext context,
- Class<K> keyClass,
- Class<V> valueClass,
- boolean useContextSerdes) {
-
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build();
- } else {
- supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
- }
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+
+ final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.lruMap("my-store", 10),
+ (Serde<K>) context.keySerde(),
+ (Serde<V>) context.valueSerde());
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ final StateStore store = storeBuilder.build();
store.init(context, store);
- return store;
+
+ return (KeyValueStore<K, V>) store;
}
@Test
@@ -157,7 +154,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
// Create the store, which should register with the context and automatically
// receive the restore entries ...
- store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ store = createKeyValueStore(driver.context());
context.restore(store.name(), driver.restoredEntries());
// Verify that the store's changelog does not get more appends ...
assertEquals(0, driver.numFlushedEntryStored());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5aaf82f..ec2e338 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,11 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import org.rocksdb.Options;
@@ -36,29 +39,15 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
- final Class<K> keyClass,
- final Class<V> valueClass,
- final boolean useContextSerdes) {
- final Stores.PersistentKeyValueFactory<?, ?> factory;
- if (useContextSerdes) {
- factory = Stores
- .create("my-store")
- .withKeys(context.keySerde())
- .withValues(context.valueSerde())
- .persistent();
-
- } else {
- factory = Stores
- .create("my-store")
- .withKeys(keyClass)
- .withValues(valueClass)
- .persistent();
- }
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("my-store"),
+ (Serde<K>) context.keySerde(),
+ (Serde<V>) context.valueSerde());
- final KeyValueStore<K, V> store = (KeyValueStore<K, V>) factory.build().get();
+ final StateStore store = storeBuilder.build();
store.init(context, store);
- return store;
+ return (KeyValueStore<K, V>) store;
}
public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
deleted file mode 100644
index d11b9b4..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SerializedKeyValueIteratorTest {
-
- private final StateSerdes<String, String> serdes = new StateSerdes<>("blah", Serdes.String(), Serdes.String());
- private final Iterator<KeyValue<Bytes, byte[]>> iterator
- = Arrays.asList(KeyValue.pair(Bytes.wrap("hi".getBytes()), "there".getBytes()),
- KeyValue.pair(Bytes.wrap("hello".getBytes()), "world".getBytes()))
- .iterator();
- private final DelegatingPeekingKeyValueIterator<Bytes, byte[]> peeking
- = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(iterator));
- private final SerializedKeyValueIterator<String, String> serializedKeyValueIterator
- = new SerializedKeyValueIterator<>(peeking, serdes);
-
- @Test
- public void shouldReturnTrueOnHasNextWhenMoreResults() {
- assertTrue(serializedKeyValueIterator.hasNext());
- }
-
- @Test
- public void shouldReturnNextValueWhenItExists() {
- assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hi", "there")));
- assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hello", "world")));
- }
-
- @Test
- public void shouldReturnFalseOnHasNextWhenNoMoreResults() {
- advanceIteratorToEnd();
- assertFalse(serializedKeyValueIterator.hasNext());
- }
-
- @Test
- public void shouldThrowNoSuchElementOnNextWhenIteratorExhausted() {
- advanceIteratorToEnd();
- try {
- serializedKeyValueIterator.next();
- fail("Expected NoSuchElementException on exhausted iterator");
- } catch (final NoSuchElementException nse) {
- // pass
- }
- }
-
- @Test
- public void shouldPeekNextKey() {
- assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hi"));
- serializedKeyValueIterator.next();
- assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hello"));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void shouldThrowUnsupportedOperationOnRemove() {
- serializedKeyValueIterator.remove();
- }
-
- private void advanceIteratorToEnd() {
- serializedKeyValueIterator.next();
- serializedKeyValueIterator.next();
- }
-
-
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index ce6cca8..06137fb 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -50,12 +50,12 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
private final File stateDir;
private final Metrics metrics;
- private final Serde<?> keySerde;
- private final Serde<?> valSerde;
private final RecordCollector.Supplier recordCollectorSupplier;
private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
+ private Serde<?> keySerde;
+ private Serde<?> valSerde;
private long timestamp = -1L;
public MockProcessorContext(final File stateDir,
@@ -121,6 +121,14 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
return recordCollector;
}
+ public void setKeySerde(final Serde<?> keySerde) {
+ this.keySerde = keySerde;
+ }
+
+ public void setValueSerde(final Serde<?> valSerde) {
+ this.valSerde = valSerde;
+ }
+
// serdes will override whatever specified in the configs
@Override
public Serde<?> keySerde() {
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.