You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/01 07:28:28 UTC
[kafka] branch trunk updated: KAFAK-3522: add API to create
timestamped stores (#6601)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5665e6 KAFAK-3522: add API to create timestamped stores (#6601)
c5665e6 is described below
commit c5665e6945c8e63ddfb1056c4893f16cae1f6f99
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed May 1 09:28:10 2019 +0200
KAFAK-3522: add API to create timestamped stores (#6601)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/streams/state/Stores.java | 261 ++++++++++++++++-----
.../kafka/streams/state/TimestampedBytesStore.java | 3 +
...ValueToTimestampedKeyValueByteStoreAdapter.java | 131 +++++++++++
...yValueToTimestampedKeyValueIteratorAdapter.java | 58 +++++
.../state/internals/RocksDBTimestampedStore.java | 13 +-
...ils.java => RocksDBTimestampedWindowStore.java} | 19 +-
.../internals/RocksDbWindowBytesStoreSupplier.java | 37 ++-
.../internals/TimestampedKeyValueStoreBuilder.java | 7 +-
.../internals/TimestampedWindowStoreBuilder.java | 7 +-
.../WindowToTimestampedWindowByteStoreAdapter.java | 152 ++++++++++++
.../org/apache/kafka/streams/state/StoresTest.java | 142 ++++++++---
.../TimestampedKeyValueStoreBuilderTest.java | 38 ++-
.../TimestampedWindowStoreBuilderTest.java | 183 +++++++++++++++
.../state/internals/WindowStoreBuilderTest.java | 34 +--
14 files changed, 929 insertions(+), 156 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c85fe03..e40251d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplie
import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import java.time.Duration;
@@ -75,13 +77,18 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
* }</pre>
*/
@InterfaceStability.Evolving
-public class Stores {
+public final class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+ * If you want to create a {@link TimestampedKeyValueStore} you should use
+ * {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead.
+ *
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
- * to build a persistent store
+ * to build a persistent key-value store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
@@ -89,7 +96,28 @@ public class Stores {
}
/**
+ * Create a persistent {@link KeyValueBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a
+ * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+ * If you want to create a {@link KeyValueStore} you should use
+ * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+ * to build a persistent key-(timestamp/value) store
+ */
+ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
+ return new RocksDbKeyValueBytesStoreSupplier(name, true);
+ }
+
+ /**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
+ * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+ *
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
@@ -116,6 +144,10 @@ public class Stores {
/**
* Create a LRU Map {@link KeyValueBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
+ * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+ *
* @param name name of the store (cannot be {@code null})
* @param maxCacheSize maximum number of items in the LRU (cannot be negative)
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
@@ -145,49 +177,13 @@ public class Stores {
}
/**
- * Create an in-memory {@link WindowBytesStoreSupplier}.
- * @param name name of the store (cannot be {@code null})
- * @param retentionPeriod length of time to retain data in the store (cannot be negative)
- * Note that the retention period must be at least long enough to contain the
- * windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
- * @param windowSize size of the windows (cannot be negative)
- * @return an instance of {@link WindowBytesStoreSupplier}
- * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
- */
- public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
- final Duration retentionPeriod,
- final Duration windowSize,
- final boolean retainDuplicates) throws IllegalArgumentException {
- Objects.requireNonNull(name, "name cannot be null");
- final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
- final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
- final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
-
- Objects.requireNonNull(name, "name cannot be null");
- if (retentionMs < 0L) {
- throw new IllegalArgumentException("retentionPeriod cannot be negative");
- }
- if (windowSizeMs < 0L) {
- throw new IllegalArgumentException("windowSize cannot be negative");
- }
- if (windowSizeMs > retentionMs) {
- throw new IllegalArgumentException("The retention period of the window store "
- + name + " must be no smaller than its window size. Got size=["
- + windowSize + "], retention=[" + retentionPeriod + "]");
- }
-
- return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
- }
-
- /**
* Create a persistent {@link WindowBytesStoreSupplier}.
+ *
* @param name name of the store (cannot be {@code null})
- * @param retentionPeriod length of time to retain data in the store (cannot be negative).
- * Note that the retention period must be at least long enough to contain the
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
+ * and for the entire grace period)
* @param numSegments number of db segments (cannot be zero or negative)
* @param windowSize size of the windows that are stored (cannot be negative). Note: the window size
* is not stored with the records, so this value is used to compute the keys that
@@ -214,17 +210,23 @@ public class Stores {
retentionPeriod,
windowSize,
retainDuplicates,
- legacySegmentInterval
+ legacySegmentInterval,
+ false
);
}
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+ * If you want to create a {@link TimestampedWindowStore} you should use
+ * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
+ *
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
- * Note that the retention period must be at least long enough to contain the
+ * (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
+ * and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
@@ -234,6 +236,39 @@ public class Stores {
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
+ return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false);
+ }
+
+ /**
+ * Create a persistent {@link WindowBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a
+ * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+ * If you want to create a {@link WindowStore} you should use
+ * {@link #persistentWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * (note that the retention period must be at least long enough to contain the
+ * windowed data's entire life cycle, from window-start through window-end,
+ * and for the entire grace period)
+ * @param windowSize size of the windows (cannot be negative)
+ * @param retainDuplicates whether or not to retain duplicates.
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
+ */
+ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates) throws IllegalArgumentException {
+ return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true);
+ }
+
+ private static WindowBytesStoreSupplier persistentWindowStore(final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates,
+ final boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
@@ -242,14 +277,15 @@ public class Stores {
final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
- return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval);
+ return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore);
}
private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
- final long segmentInterval) {
+ final long segmentInterval,
+ final boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
@@ -262,8 +298,8 @@ public class Stores {
}
if (windowSize > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
- + name + " must be no smaller than its window size. Got size=["
- + windowSize + "], retention=[" + retentionPeriod + "]");
+ + name + " must be no smaller than its window size. Got size=["
+ + windowSize + "], retention=[" + retentionPeriod + "]");
}
return new RocksDbWindowBytesStoreSupplier(
@@ -272,7 +308,49 @@ public class Stores {
segmentInterval,
windowSize,
retainDuplicates,
- false);
+ timestampedStore);
+ }
+
+ /**
+ * Create an in-memory {@link WindowBytesStoreSupplier}.
+ * <p>
+ * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)} or
+ * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * Note that the retention period must be at least long enough to contain the
+ * windowed data's entire life cycle, from window-start through window-end,
+ * and for the entire grace period.
+ * @param windowSize size of the windows (cannot be negative)
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
+ */
+ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates) throws IllegalArgumentException {
+ Objects.requireNonNull(name, "name cannot be null");
+
+ final String repartitionPeriodErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+ final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, repartitionPeriodErrorMessagePrefix);
+ if (retentionMs < 0L) {
+ throw new IllegalArgumentException("retentionPeriod cannot be negative");
+ }
+
+ final String windowSizeErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+ final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix);
+ if (windowSizeMs < 0L) {
+ throw new IllegalArgumentException("windowSize cannot be negative");
+ }
+
+ if (windowSizeMs > retentionMs) {
+ throw new IllegalArgumentException("The retention period of the window store "
+ + name + " must be no smaller than its window size. Got size=["
+ + windowSize + "], retention=[" + retentionPeriod + "]");
+ }
+
+ return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
}
/**
@@ -297,11 +375,12 @@ public class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
+ *
* @param name name of the store (cannot be {@code null})
* @param retentionPeriodMs length ot time to retain data in the store (cannot be negative)
- * Note that the retention period must be at least long enough to contain the
+ * (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
+ * and for the entire grace period)
* @return an instance of a {@link SessionBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
@@ -317,6 +396,7 @@ public class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
+ *
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length ot time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
@@ -331,12 +411,58 @@ public class Stores {
return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
}
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}.
+ * <p>
+ * The provided supplier should <strong>not</strong> be a supplier for
+ * {@link TimestampedKeyValueStore TimestampedKeyValueStores}.
+ *
+ * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
+ * it is treated as delete
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+ */
+ public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ }
+
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStore}.
+ * <p>
+ * The provided supplier should <strong>not</strong> be a supplier for
+ * {@link KeyValueStore KeyValueStores}. For this case, passed in timestamps will be dropped and not stored in the
+ * key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.
+ *
+ * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
+ * it is treated as delete
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+ */
+ public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ }
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
+ * <p>
+ * The provided supplier should <strong>not</strong> be a supplier for
+ * {@link TimestampedWindowStore TimestampedWindowStores}.
+ *
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
@@ -350,37 +476,42 @@ public class Stores {
}
/**
- * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
- * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+ * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedWindowStore}.
+ * <p>
+ * The provided supplier should <strong>not</strong> be a supplier for
+ * {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the
+ * windows-store. On read, no valid timestamp but a dummy timestamp will be returned.
+ *
+ * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
- * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+ * @return an instance of {@link StoreBuilder} that can build a {@link TimestampedWindowStore}
*/
- public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
- return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ return new TimestampedWindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
+ *
* @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
- * @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
+ * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
- * */
+ */
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
-}
-
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
index 5b5fbc5..e609b70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
@@ -22,6 +22,9 @@ import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
public interface TimestampedBytesStore {
static byte[] convertToTimestampedFormat(final byte[] plainValue) {
+ if (plainValue == null) {
+ return null;
+ }
return ByteBuffer
.allocate(8 + plainValue.length)
.putLong(NO_TIMESTAMP)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
new file mode 100644
index 0000000..62cfac3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStores} via
+ * {@link org.apache.kafka.streams.kstream.Materialized#as(KeyValueBytesStoreSupplier)} this adapter is used to
+ * translate between old a new {@code byte[]} format of the value.
+ *
+ * @see KeyValueToTimestampedKeyValueIteratorAdapter
+ */
+public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]> {
+ final KeyValueStore<Bytes, byte[]> store;
+
+ KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
+ }
+ this.store = store;
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueWithTimestamp) {
+ store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] valueWithTimestamp) {
+ return convertToTimestampedFormat(store.putIfAbsent(
+ key,
+ valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)));
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ final byte[] valueWithTimestamp = entry.value;
+ store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+ }
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ return convertToTimestampedFormat(store.delete(key));
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void flush() {
+ store.flush();
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return convertToTimestampedFormat(store.get(key));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.range(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return store.approximateNumEntries();
+ }
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java
new file mode 100644
index 0000000..7bdcb5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and
+ * {@link org.apache.kafka.streams.state.KeyValueStore}.
+ *
+ * @see KeyValueToTimestampedKeyValueByteStoreAdapter
+ */
+class KeyValueToTimestampedKeyValueIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
+ private final KeyValueIterator<K, byte[]> innerIterator;
+
+ KeyValueToTimestampedKeyValueIteratorAdapter(final KeyValueIterator<K, byte[]> innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, byte[]> next() {
+ final KeyValue<K, byte[]> plainKeyValue = innerIterator.next();
+ return KeyValue.pair(plainKeyValue.key, convertToTimestampedFormat(plainKeyValue.value));
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index f52033b..5466ce8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@@ -42,12 +43,12 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import static java.util.Arrays.asList;
-import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp;
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
/**
* A persistent key-(value-timestamp) store based on RocksDB.
*/
-public class RocksDBTimestampedStore extends RocksDBStore {
+public class RocksDBTimestampedStore extends RocksDBStore implements TimestampedBytesStore {
private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class);
RocksDBTimestampedStore(final String name) {
@@ -160,7 +161,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
- final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue);
+ final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
// this does only work, because the changelog topic contains correct data already
// for other format changes, we cannot take this short cut and can only migrate data
// from old to new store on put()
@@ -180,7 +181,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
- return getValueWithUnknownTimestamp(plainValue);
+ return convertToTimestampedFormat(plainValue);
}
return null;
@@ -319,12 +320,12 @@ public class RocksDBTimestampedStore extends RocksDBStore {
}
} else {
if (nextWithTimestamp == null) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
+ next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
+ next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
similarity index 67%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java
rename to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
index e78b382..b96748e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
@@ -16,21 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.nio.ByteBuffer;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
-import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
+class RocksDBTimestampedWindowStore extends RocksDBWindowStore implements TimestampedBytesStore {
-class StoreProxyUtils {
-
- static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) {
- if (rawValue == null) {
- return null;
- }
- return ByteBuffer
- .allocate(8 + rawValue.length)
- .putLong(NO_TIMESTAMP)
- .put(rawValue)
- .array();
+ RocksDBTimestampedWindowStore(final SegmentedBytesStore bytesStore,
+ final boolean retainDuplicates,
+ final long windowSize) {
+ super(bytesStore, retainDuplicates, windowSize);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index b2e8c11..79f1ee3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -49,28 +49,27 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
@Override
public WindowStore<Bytes, byte[]> get() {
- final SegmentedBytesStore segmentedBytesStore;
if (!returnTimestampedStore) {
- segmentedBytesStore = new RocksDBSegmentedBytesStore(
- name,
- metricsScope(),
- retentionPeriod,
- segmentInterval,
- new WindowKeySchema()
- );
+ return new RocksDBWindowStore(
+ new RocksDBSegmentedBytesStore(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentInterval,
+ new WindowKeySchema()),
+ retainDuplicates,
+ windowSize);
} else {
- segmentedBytesStore = new RocksDBTimestampedSegmentedBytesStore(
- name,
- metricsScope(),
- retentionPeriod,
- segmentInterval,
- new WindowKeySchema()
- );
+ return new RocksDBTimestampedWindowStore(
+ new RocksDBTimestampedSegmentedBytesStore(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentInterval,
+ new WindowKeySchema()),
+ retainDuplicates,
+ windowSize);
}
- return new RocksDBWindowStore(
- segmentedBytesStore,
- retainDuplicates,
- windowSize);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index 5a0bf22..f43e4e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -46,8 +47,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
@Override
public TimestampedKeyValueStore<K, V> build() {
+ KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
+ if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
+ store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+ }
return new MeteredTimestampedKeyValueStore<>(
- maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.metricsScope(),
time,
keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index dcb0d44..2c7c950 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -42,8 +43,12 @@ public class TimestampedWindowStoreBuilder<K, V>
@Override
public TimestampedWindowStore<K, V> build() {
+ WindowStore<Bytes, byte[]> store = storeSupplier.get();
+ if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
+ store = new WindowToTimestampedWindowByteStoreAdapter(store);
+ }
return new MeteredTimestampedWindowStore<>(
- maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.windowSize(),
storeSupplier.metricsScope(),
time,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
new file mode 100644
index 0000000..7bd8665
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+
+class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, byte[]> {
+ final WindowStore<Bytes, byte[]> store;
+
+ WindowToTimestampedWindowByteStoreAdapter(final WindowStore<Bytes, byte[]> store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
+ }
+ this.store = store;
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueWithTimestamp) {
+ store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueWithTimestamp,
+ final long windowStartTimestamp) {
+ store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp);
+ }
+
+ @Override
+ public byte[] fetch(final Bytes key,
+ final long time) {
+ return convertToTimestampedFormat(store.fetch(key, time));
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public WindowStoreIterator<byte[]> fetch(final Bytes key,
+ final long timeFrom,
+ final long timeTo) {
+ return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key,
+ final Instant from,
+ final Instant to) {
+ return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, from, to));
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+ final Bytes to,
+ final long timeFrom,
+ final long timeTo) {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+ final Bytes to,
+ final Instant fromTime,
+ final Instant toTime) {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, fromTime, toTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from,
+ final Instant to) {
+ return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(from, to));
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void flush() {
+ store.flush();
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+
+ private static class WindowToTimestampedWindowIteratorAdapter
+ extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
+ implements WindowStoreIterator<byte[]> {
+
+ WindowToTimestampedWindowIteratorAdapter(final KeyValueIterator<Long, byte[]> innerIterator) {
+ super(innerIterator);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 4819ac1..e520df4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -36,75 +36,112 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class StoresTest {
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {
- Stores.persistentKeyValueStore(null);
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentKeyValueStore(null));
+ assertEquals("name cannot be null", e.getMessage());
+ }
+
+ @Test
+ public void shouldThrowIfPersistentTimestampedKeyValueStoreStoreNameIsNull() {
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedKeyValueStore(null));
+ assertEquals("name cannot be null", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
- //noinspection ResultOfMethodCallIgnored
- Stores.inMemoryKeyValueStore(null);
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.inMemoryKeyValueStore(null));
+ assertEquals("name cannot be null", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfILruMapStoreNameIsNull() {
- Stores.lruMap(null, 0);
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.lruMap(null, 0));
+ assertEquals("name cannot be null", e.getMessage());
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowIfILruMapStoreCapacityIsNegative() {
- Stores.lruMap("anyName", -1);
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.lruMap("anyName", -1));
+ assertEquals("maxCacheSize cannot be negative", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
- Stores.persistentWindowStore(null, ZERO, ZERO, false);
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentWindowStore(null, ZERO, ZERO, false));
+ assertEquals("name cannot be null", e.getMessage());
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
+ public void shouldThrowIfIPersistentTimestampedWindowStoreStoreNameIsNull() {
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedWindowStore(null, ZERO, ZERO, false));
+ assertEquals("name cannot be null", e.getMessage());
+ }
+
+ @Test
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
- Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false);
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false));
+ assertEquals("retentionPeriod cannot be negative", e.getMessage());
+ }
+
+ @Test
+ public void shouldThrowIfIPersistentTimestampedWindowStoreRetentionPeriodIsNegative() {
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(-1L), ZERO, false));
+ assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
@Deprecated
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
- Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", 0L, 1, 0L, false));
+ assertEquals("numSegments cannot be smaller than 2", e.getMessage());
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
- Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false);
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
+ assertEquals("windowSize cannot be negative", e.getMessage());
}
- @Test(expected = NullPointerException.class)
- public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
- Stores.persistentSessionStore(null, ofMillis(0));
+ @Test
+ public void shouldThrowIfIPersistentTimestampedWindowStoreIfWindowSizeIsNegative() {
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
+ assertEquals("windowSize cannot be negative", e.getMessage());
+ }
+ @Test
+ public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentSessionStore(null, ofMillis(0)));
+ assertEquals("name cannot be null", e.getMessage());
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() {
- Stores.persistentSessionStore("anyName", ofMillis(-1));
+ final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentSessionStore("anyName", ofMillis(-1)));
+ assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() {
- Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+ assertEquals("supplier cannot be null", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() {
- Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+ assertEquals("supplier cannot be null", e.getMessage());
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
- Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ final Exception e = assertThrows(NullPointerException.class, () -> Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+ assertEquals("supplier cannot be null", e.getMessage());
}
@Test
@@ -125,6 +162,11 @@ public class StoresTest {
}
@Test
+ public void shouldCreateRocksDbTimestampedStore() {
+ assertThat(Stores.persistentTimestampedKeyValueStore("store").get(), instanceOf(RocksDBTimestampedStore.class));
+ }
+
+ @Test
public void shouldCreateRocksDbWindowStore() {
final WindowStore store = Stores.persistentWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
@@ -133,11 +175,49 @@ public class StoresTest {
}
@Test
+ public void shouldCreateRocksDbTimestampedWindowStore() {
+ final WindowStore store = Stores.persistentTimestampedWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertThat(store, instanceOf(RocksDBWindowStore.class));
+ assertThat(wrapped, instanceOf(RocksDBTimestampedSegmentedBytesStore.class));
+ }
+
+ @Test
public void shouldCreateRocksDbSessionStore() {
assertThat(Stores.persistentSessionStore("store", ofMillis(1)).get(), instanceOf(RocksDBSessionStore.class));
}
@Test
+ public void shouldBuildKeyValueStore() {
+ final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
+ public void shouldBuildTimestampedKeyValueStore() {
+ final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
+ public void shouldBuildTimestampedKeyValueStoreThatWrapsKeyValueStore() {
+ final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
@@ -148,9 +228,9 @@ public class StoresTest {
}
@Test
- public void shouldBuildKeyValueStore() {
- final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
- Stores.persistentKeyValueStore("name"),
+ public void shouldBuildTimestampedWindowStore() {
+ final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore("store", ofMillis(3L), ofMillis(3L), true),
Serdes.String(),
Serdes.String()
).build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
index e6dbc66..7b0eb6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -35,6 +34,9 @@ import org.junit.runner.RunWith;
import java.util.Collections;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -49,9 +51,9 @@ public class TimestampedKeyValueStoreBuilderTest {
@Before
public void setUp() {
- EasyMock.expect(supplier.get()).andReturn(inner);
- EasyMock.expect(supplier.name()).andReturn("name");
- EasyMock.replay(supplier);
+ expect(supplier.get()).andReturn(inner);
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
builder = new TimestampedKeyValueStoreBuilder<>(
supplier,
Serdes.String(),
@@ -114,6 +116,34 @@ public class TimestampedKeyValueStoreBuilderTest {
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
+ @Test
+ public void shouldNotWrapTimestampedByteStore() {
+ reset(supplier);
+ expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name"));
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ final TimestampedKeyValueStore<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedStore.class));
+ }
+
+ @Test
+ public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
+ reset(supplier);
+ expect(supplier.get()).andReturn(new RocksDBStore("name"));
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ final TimestampedKeyValueStore<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(KeyValueToTimestampedKeyValueByteStoreAdapter.class));
+ }
+
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
new file mode 100644
index 0000000..7be31ea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class TimestampedWindowStoreBuilderTest {
+
+ @Mock(type = MockType.NICE)
+ private WindowBytesStoreSupplier supplier;
+ @Mock(type = MockType.NICE)
+ private WindowStore<Bytes, byte[]> inner;
+ private TimestampedWindowStoreBuilder<String, String> builder;
+
+ @Before
+ public void setUp() {
+ expect(supplier.get()).andReturn(inner);
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ builder = new TimestampedWindowStoreBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ final TimestampedWindowStore<String, String> store = builder.build();
+ assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ final TimestampedWindowStore<String, String> store = builder.build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+ assertThat(next, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ final TimestampedWindowStore<String, String> store = builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+ assertThat(next, CoreMatchers.equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ final TimestampedWindowStore<String, String> store = builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+ assertThat(wrapped, instanceOf(CachingWindowStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ final TimestampedWindowStore<String, String> store = builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+ assertThat(wrapped, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+ assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ final TimestampedWindowStore<String, String> store = builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
+ final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped();
+ assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+ assertThat(caching, instanceOf(CachingWindowStore.class));
+ assertThat(changeLogging, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+ assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
+ }
+
+ @Test
+ public void shouldNotWrapTimestampedByteStore() {
+ reset(supplier);
+ expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore(
+ new RocksDBTimestampedSegmentedBytesStore(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ final TimestampedWindowStore<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedWindowStore.class));
+ }
+
+ @Test
+ public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
+ reset(supplier);
+ expect(supplier.get()).andReturn(new RocksDBWindowStore(
+ new RocksDBSegmentedBytesStore(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ final TimestampedWindowStore<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class));
+ }
+
+ @SuppressWarnings("all")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ new TimestampedWindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfKeySerdeIsNull() {
+ new TimestampedWindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfValueSerdeIsNull() {
+ new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfTimeIsNull() {
+ new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+ }
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
index 022f6dd..bf29d4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -34,6 +33,8 @@ import org.junit.runner.RunWith;
import java.util.Collections;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -47,16 +48,16 @@ public class WindowStoreBuilderTest {
private WindowStoreBuilder<String, String> builder;
@Before
- public void setUp() throws Exception {
- EasyMock.expect(supplier.get()).andReturn(inner);
- EasyMock.expect(supplier.name()).andReturn("name");
- EasyMock.replay(supplier);
-
- builder = new WindowStoreBuilder<>(supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime());
-
+ public void setUp() {
+ expect(supplier.get()).andReturn(inner);
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+
+ builder = new WindowStoreBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
}
@Test
@@ -76,7 +77,7 @@ public class WindowStoreBuilderTest {
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final WindowStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrapped();
- assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+ assertThat(next, CoreMatchers.equalTo(inner));
}
@Test
@@ -90,18 +91,18 @@ public class WindowStoreBuilderTest {
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final WindowStore<String, String> store = builder
- .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .withLoggingEnabled(Collections.emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
- assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.<StateStore>equalTo(inner));
+ assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final WindowStore<String, String> store = builder
- .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .withLoggingEnabled(Collections.emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
@@ -109,9 +110,10 @@ public class WindowStoreBuilderTest {
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(caching, instanceOf(CachingWindowStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
- assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner));
+ assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
+ @SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());