You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/08 21:02:09 UTC
kafka git commit: KAFKA-5852: Add filter, filterNot,
mapValues and Materialized to KTable
Repository: kafka
Updated Branches:
refs/heads/trunk 6e4045586 -> 2db1e4423
KAFKA-5852: Add filter, filterNot, mapValues and Materialized to KTable
Add overloads of `filter`, `filterNot`, `mapValues` that take `Materialized` as a param to `KTable`. Deprecate overloads using `storeName` and `storeSupplier`
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3807 from dguy/ktable-filter-map
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2db1e442
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2db1e442
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2db1e442
Branch: refs/heads/trunk
Commit: 2db1e4423fb33405a319b253b040e57e069c1f7a
Parents: 6e40455
Author: Damian Guy <da...@gmail.com>
Authored: Fri Sep 8 14:01:58 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Sep 8 14:01:58 2017 -0700
----------------------------------------------------------------------
docs/streams/developer-guide.html | 11 +-
.../apache/kafka/streams/kstream/KTable.java | 140 +++++++++++++-
.../kafka/streams/kstream/Materialized.java | 184 +++++++++++++++++++
.../streams/kstream/internals/KTableImpl.java | 60 ++++++
.../internals/KeyValueStoreMaterializer.java | 52 ++++++
.../kstream/internals/MaterializedInternal.java | 62 +++++++
.../QueryableStateIntegrationTest.java | 13 +-
.../kstream/internals/KTableFilterTest.java | 28 +++
.../kstream/internals/KTableImplTest.java | 22 +++
.../kstream/internals/KTableMapValuesTest.java | 9 +-
.../KeyValueStoreMaterializerTest.java | 116 ++++++++++++
11 files changed, 683 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 42a9b20..ab5a823 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -707,7 +707,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
KStream<String, Long> stream = ...;
-
+ KTable<String, Long> table = ...;
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);
@@ -720,6 +720,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return value > 0;
}
});
+
+ // A filter on a KTable that materializes the result into a StateStore
+ table.filter((key, value) -> value != 0, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("filtered"));
</pre>
</td>
</tr>
@@ -991,7 +994,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
- KStream<byte[], String> stream = ...;
+ KStream<byte[], String> stream = ...;
+ KTable<String, String> table = ...;
// Java 8+ example, using lambda expressions
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
@@ -1004,6 +1008,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return s.toUpperCase();
}
});
+
+ // mapValues on a KTable and also materialize the results into a statestore
+ table.mapValue(value -> value.toUpperCase(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("uppercased"));
</pre>
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 4bc9572..2571ac1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,14 +19,17 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -116,6 +119,44 @@ public interface KTable<K, V> {
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ * <p>
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate, Materialized)
+ */
+ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+ * predicate.
+ * All records that do not satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+ * is forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
*
* @param predicate a filter {@link Predicate} that is applied to each record
@@ -124,8 +165,10 @@ public interface KTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
- * @see #filterNot(Predicate)
+ * @see #filterNot(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
/**
@@ -159,8 +202,10 @@ public interface KTable<K, V> {
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
- * @see #filterNot(Predicate)
+ * @see #filterNot(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -211,12 +256,50 @@ public interface KTable<K, V> {
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ * <p>
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+ * @see #filter(Predicate, Materialized)
+ */
+ KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+ /**
+ * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+ * given predicate.
+ * All records that <em>do</em> satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+ * forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
- * @see #filter(Predicate)
+ * @see #filter(Predicate, Materialized)
+ * @deprecated use {@link #filterNot(Predicate, Materialized)}
*/
+ @Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@@ -252,8 +335,10 @@ public interface KTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
- * @see #filter(Predicate)
+ * @see #filter(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@@ -291,6 +376,49 @@ public interface KTable<K, V> {
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
+ /**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type)in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+ * computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below counts the number of token of the value string.
+ * <pre>{@code
+ * KTable<String, String> inputTable = builder.table("topic");
+ * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ * <p>
+ * This operation preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ * <p>
+ * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @param <VR> the value type of the result {@code KTable}
+ *
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
@@ -335,7 +463,9 @@ public interface KTable<K, V> {
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
+ @Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
/**
@@ -377,7 +507,9 @@ public interface KTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
+ @Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier);
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
new file mode 100644
index 0000000..fb2e7a6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to describe how a {@link StateStore} should be materialized.
+ * You can either provide a custom {@link StateStore} backend
+ * through one of the provided methods accepting a supplier or use the default RocksDB backends
+ * by providing just a store name.
+ */
+public class Materialized<K, V, S extends StateStore> {
+ protected StoreSupplier<S> storeSupplier;
+ protected String storeName;
+ protected Serde<V> valueSerde;
+ protected Serde<K> keySerde;
+ protected boolean loggingEnabled = true;
+ protected boolean cachingEnabled = true;
+ protected Map<String, String> topicConfig = new HashMap<>();
+
+ private Materialized(final StoreSupplier<S> storeSupplier) {
+ this.storeSupplier = storeSupplier;
+ }
+
+ private Materialized(final String storeName) {
+ this.storeName = storeName;
+ }
+
+ /**
+ * Copy constructor.
+ * @param materialized the {@link Materialized} instance to copy.
+ */
+ protected Materialized(final Materialized<K, V, S> materialized) {
+ this.storeSupplier = materialized.storeSupplier;
+ this.storeName = materialized.storeName;
+ this.keySerde = materialized.keySerde;
+ this.valueSerde = materialized.valueSerde;
+ this.loggingEnabled = materialized.loggingEnabled;
+ this.cachingEnabled = materialized.cachingEnabled;
+ this.topicConfig = materialized.topicConfig;
+ }
+
+ /**
+ * Materialize a {@link StateStore} with the given name.
+ *
+ * @param storeName name of the store to materialize
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @param <S> type of the {@link StateStore}
+ * @return a new {@link Materialized} instance with the given storeName
+ */
+ public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
+ return new Materialized<>(storeName);
+ }
+
+ /**
+ * Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}.
+ *
+ * @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}.
+ *
+ * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Materialize a {@link KeyValueStore} using the provided {@link KeyValueBytesStoreSupplier}.
+ *
+ * @param supplier the {@link KeyValueBytesStoreSupplier} used to materialize the store
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Set the valueSerde the materialized {@link StateStore} will use.
+ *
+ * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
+ * serde from configs will be used
+ * @return itself
+ */
+ public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) {
+ this.valueSerde = valueSerde;
+ return this;
+ }
+
+ /**
+ * Set the keySerde the materialize {@link StateStore} will use.
+ * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
+ * serde from configs will be used
+ * @return itself
+ */
+ public Materialized<K, V, S> withKeySerde(final Serde<K> keySerde) {
+ this.keySerde = keySerde;
+ return this;
+ }
+
+ /**
+ * Indicates that a changelog should be created for the store. The changelog will be created
+ * with the provided configs.
+ * <p>
+ * Note: Any unrecognized configs will be ignored.
+ * @param config any configs that should be applied to the changelog
+ * @return itself
+ */
+ public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> config) {
+ loggingEnabled = true;
+ this.topicConfig = config;
+ return this;
+ }
+
+ /**
+ * Disable change logging for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized<K, V, S> withLoggingDisabled() {
+ loggingEnabled = false;
+ this.topicConfig.clear();
+ return this;
+ }
+
+ /**
+ * Enable caching for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized<K, V, S> withCachingEnabled() {
+ cachingEnabled = true;
+ return this;
+ }
+
+ /**
+ * Disable caching for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized<K, V, S> withCachingDisabled() {
+ cachingEnabled = false;
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 87277b6..d3d6ce2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -33,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
@@ -155,6 +158,30 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
+ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
+ final boolean filterNot) {
+ String name = builder.newName(FILTER_NAME);
+
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
+ predicate,
+ filterNot,
+ materialized.storeName());
+ builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+
+ final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize();
+ this.builder.internalTopologyBuilder.addStateStore(builder, name);
+
+ return new KTableImpl<>(this.builder,
+ name,
+ processorSupplier,
+ this.keySerde,
+ this.valSerde,
+ sourceNodes,
+ builder.name(),
+ true);
+ }
+
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
return filter(predicate, (String) null);
@@ -162,6 +189,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return doFilter(predicate, new MaterializedInternal<>(materialized), false);
+ }
+
+ @Override
+ public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
@@ -184,6 +219,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return doFilter(predicate, new MaterializedInternal<>(materialized), true);
+ }
+
+ @Override
+ public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
StateStoreSupplier<KeyValueStore> storeSupplier = null;
if (queryableStoreName != null) {
@@ -224,6 +267,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
+ public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized);
+ final String name = builder.newName(MAPVALUES_NAME);
+ final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
+ mapper,
+ materializedInternal.storeName());
+ builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+ builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ name);
+ return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
+ }
+
+ @Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final String queryableStoreName) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
new file mode 100644
index 0000000..1d702f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+public class KeyValueStoreMaterializer<K, V> {
+ private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
+
+ public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ this.materialized = materialized;
+ }
+
+ public StoreBuilder<KeyValueStore<K, V>> materialize() {
+ KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
+ if (supplier == null) {
+ supplier = Stores.persistentKeyValueStore(materialized.storeName());
+ }
+ final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ if (materialized.cachingEnabled()) {
+ builder.withCachingEnabled();
+ }
+ return builder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
new file mode 100644
index 0000000..d7ebc65
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+
+import java.util.Map;
+
+public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
+
+ public MaterializedInternal(final Materialized<K, V, S> materialized) {
+ super(materialized);
+ }
+
+ public String storeName() {
+ if (storeName != null) {
+ return storeName;
+ }
+ return storeSupplier.name();
+ }
+
+ public StoreSupplier<S> storeSupplier() {
+ return storeSupplier;
+ }
+
+ public Serde<K> keySerde() {
+ return keySerde;
+ }
+
+ public Serde<V> valueSerde() {
+ return valueSerde;
+ }
+
+ public boolean loggingEnabled() {
+ return loggingEnabled;
+ }
+
+ public Map<String, String> logConfig() {
+ return topicConfig;
+ }
+
+ public boolean cachingEnabled() {
+ return cachingEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 0e9bc33..dc59fb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
@@ -39,11 +40,13 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -446,8 +449,8 @@ public class QueryableStateIntegrationTest {
}
};
final KTable<String, Long> t1 = builder.table(streamOne);
- final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter");
- t1.filterNot(filterPredicate, "queryFilterNot");
+ final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
+ t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilterNot"));
t2.to(outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -509,7 +512,7 @@ public class QueryableStateIntegrationTest {
public Long apply(final String value) {
return Long.valueOf(value);
}
- }, Serdes.Long(), "queryMapValues");
+ }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -559,13 +562,13 @@ public class QueryableStateIntegrationTest {
}
};
final KTable<String, String> t1 = builder.table(streamOne);
- final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
+ final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
- }, Serdes.Long(), "queryMapValues");
+ }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 3350072..a885edd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@@ -94,6 +97,7 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
+ @SuppressWarnings("deprecation")
@Test
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -118,6 +122,30 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
+ @Test
+ public void shouldAddQueryableStore() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final String topic1 = "topic1";
+
+ KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+
+ KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
+ KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ doTestKTable(builder, table2, table3, topic1);
+ }
+
private void doTestValueGetter(final StreamsBuilder builder,
final KTableImpl<String, Integer, Integer> table2,
final KTableImpl<String, Integer, Integer> table3,
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index f06cc63..64ae6de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -18,15 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -474,4 +477,23 @@ public class KTableImplTest {
table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
+ table.filter(new Predicate<String, String>() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return false;
+ }
+ }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
+ table.filterNot(new Predicate<String, String>() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return false;
+ }
+ }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 2e7ccad..4bfaea6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -92,7 +95,7 @@ public class KTableMapValuesTest {
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
- }, Serdes.Integer(), "anyName");
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
@@ -249,14 +252,14 @@ public class KTableMapValuesTest {
public Integer apply(String value) {
return new Integer(value);
}
- }, Serdes.Integer(), "anyMapName");
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
- }, "anyFilterName");
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
table1.through(stringSerde, stringSerde, topic2, storeName2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/2db1e442/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
new file mode 100644
index 0000000..21a5d57
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import org.easymock.EasyMock;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
+
+
+public class KeyValueStoreMaterializerTest {
+
+ @Test
+ public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+ final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+ final KeyValueStore<String, String> store = builder.build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ final StateStore logging = caching.wrappedStore();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ assertThat(caching, instanceOf(CachedStateStore.class));
+ assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+ .withCachingDisabled());
+ final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+ final KeyValueStore<String, String> store = builder.build();
+ final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+ .withLoggingDisabled());
+ final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+ final KeyValueStore<String, String> store = builder.build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ assertThat(caching, instanceOf(CachedStateStore.class));
+ assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
+ .withCachingDisabled()
+ .withLoggingDisabled());
+ final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
+ final KeyValueStore<String, String> store = builder.build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
+ assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+ }
+
+ @Test
+ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
+ final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
+ final InMemoryKeyValueStore<Bytes, byte[]> store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
+ EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
+ EasyMock.expect(supplier.get()).andReturn(store);
+ EasyMock.replay(supplier);
+
+ final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier));
+ final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
+ final KeyValueStore<String, Integer> built = builder.build();
+ final StateStore inner = ((WrappedStateStore) built).inner();
+
+ assertThat(inner, CoreMatchers.<StateStore>equalTo(store));
+ }
+
+}
\ No newline at end of file