You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:16:00 UTC
[3/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for
queryable stores
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 290142b..e6219c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -24,7 +24,10 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
/**
@@ -47,7 +50,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* final KafkaStreams streams = ...;
* streams.start()
* ...
- * final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable
+ * final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
* ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
* view.get(key);
*}</pre>
@@ -87,6 +90,79 @@ public interface KTable<K, V> {
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
+ * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+ * predicate.
+ * All records that do not satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+ * is forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate)
+ */
+ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
+
+ /**
+ * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+ * predicate.
+ * All records that do not satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+ * is forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate)
+ */
+ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
@@ -109,6 +185,78 @@ public interface KTable<K, V> {
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
+ * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+ * given predicate.
+ * All records that <em>do</em> satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+ * forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+ * @see #filter(Predicate)
+ */
+ KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
+ * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+ * given predicate.
+ * All records that <em>do</em> satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+ * forwarded.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
+ * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+ * @see #filter(Predicate)
+ */
+ KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
+
+
+ /**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possible new type)in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
@@ -144,6 +292,97 @@ public interface KTable<K, V> {
/**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type)in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+ * computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below counts the number of token of the value string.
+ * <pre>{@code
+ * KTable<String, String> inputTable = builder.table("topic");
+ * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * <p>
+ * This operation preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ * <p>
+ * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#mapValues(ValueMapper)}.
+ * @param valueSerde serializer for new value type
+ * @param <VR> the value type of the result {@code KTable}
+ *
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
+
+ /**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type)in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+ * computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below counts the number of token of the value string.
+ * <pre>{@code
+ * KTable<String, String> inputTable = builder.table("topic");
+ * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * <p>
+ * This operation preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ * <p>
+ * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param valueSerde serializer for new value type
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @param <VR> the value type of the result {@code KTable}
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+ final Serde<VR> valueSerde,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+
+ /**
* Print the update records of this {@code KTable} to {@code System.out}.
* This function will use the generated name of the parent processor node to label the key/value pairs printed to
* the console.
@@ -156,7 +395,11 @@ public interface KTable<K, V> {
* <p>
* Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
* update record.
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result.
*/
+ @Deprecated
void print();
/**
@@ -173,7 +416,11 @@ public interface KTable<K, V> {
* update record.
*
* @param streamName the name used to label the key/value pairs printed to the console
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result.
*/
+ @Deprecated
void print(final String streamName);
/**
@@ -191,8 +438,12 @@ public interface KTable<K, V> {
* update record.
*
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result.
*/
+ @Deprecated
void print(final Serde<K> keySerde,
final Serde<V> valSerde);
@@ -212,7 +463,11 @@ public interface KTable<K, V> {
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
* @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @param streamName the name used to label the key/value pairs printed to the console
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result.
*/
+ @Deprecated
void print(final Serde<K> keySerde,
final Serde<V> valSerde,
final String streamName);
@@ -232,7 +487,11 @@ public interface KTable<K, V> {
* {@code KTable} update record.
*
* @param filePath name of file to write to
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result.
*/
+ @Deprecated
void writeAsText(final String filePath);
/**
@@ -250,7 +509,11 @@ public interface KTable<K, V> {
*
* @param filePath name of file to write to
* @param streamName the name used to label the key/value pairs printed out to the console
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result.
*/
+ @Deprecated
void writeAsText(final String filePath,
final String streamName);
@@ -270,8 +533,12 @@ public interface KTable<K, V> {
*
* @param filePath name of file to write to
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result.
*/
+ @Deprecated
void writeAsText(final String filePath,
final Serde<K> keySerde,
final Serde<V> valSerde);
@@ -292,8 +559,13 @@ public interface KTable<K, V> {
* @param filePath name of file to write to
* @param streamName the name used to label the key/value pairs printed to the console
* @param keySerde key serde used to deserialize key if type is {@code byte[]},
- * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result.
+
*/
+ @Deprecated
void writeAsText(final String filePath,
final String streamName,
final Serde<K> keySerde,
@@ -307,7 +579,11 @@ public interface KTable<K, V> {
* {@code KTable} update record.
*
* @param action an action to perform on each record
+ * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+ * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+ * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result.
*/
+ @Deprecated
void foreach(final ForeachAction<? super K, ? super V> action);
/**
@@ -361,12 +637,94 @@ public interface KTable<K, V> {
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
- * @param storeName the state store name used for the result {@code KTable}; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final String topic,
+ final String queryableStoreName);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+ * serializers and deserializers and producer's {@link DefaultPartitioner}.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+ * {@link KStreamBuilder#table(String, String)})
+ * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+ *
+ * @param topic the topic name
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
*/
KTable<K, V> through(final String topic,
- final String storeName);
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+ * serializers and deserializers and producer's {@link DefaultPartitioner}.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+ * {@link KStreamBuilder#table(String)})
+ *
+ * @param topic the topic name
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final String topic);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+ * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+ * records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+ * {@link KStreamBuilder#table(String)})
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link DefaultPartitioner} will be used
+ * @param topic the topic name
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+ * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+ * records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+ * {@link KStreamBuilder#table(String, String)})
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link DefaultPartitioner} will be used
+ * @param topic the topic name
+ * @param queryableStoreName the state store name used for the result {@code KTable}.
+ * If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final String queryableStoreName);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -384,12 +742,12 @@ public interface KTable<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
- * @param storeName the state store name used for the result {@code KTable}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
*/
KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final String storeName);
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
* Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -410,43 +768,156 @@ public interface KTable<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
- * @param storeName the state store name used for the result {@code KTable}
+ * @param queryableStoreName the state store name used for the result {@code KTable}.
+ * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()}
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
*/
KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
final String topic,
- final String storeName);
+ final String queryableStoreName);
/**
- * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
- * {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
* started).
* <p>
- * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
- * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+ * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+ * used—otherwise producer's {@link DefaultPartitioner} is used.
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
* {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
* {@link KStreamBuilder#table(String, String)})
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+ * used—otherwise producer's {@link DefaultPartitioner} is used.
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
+ * {@link KStreamBuilder#table(String)})
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+ final String topic);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+ * {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+ * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+ * {@link KStreamBuilder#table(String, String)})
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+ * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
+ * be used
+ * @param topic the topic name
+ * @param queryableStoreName the state store name used for the result {@code KTable}.
+ * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic,
+ final String queryableStoreName);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+ * {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+ * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+ * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+ * {@link KStreamBuilder#table(String, String)})
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
* be used
* @param topic the topic name
- * @param storeName the state store name used for the result {@code KTable}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
*/
KTable<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
- final String storeName);
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+ /**
+ * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+ * {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+ * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+ * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+ * <p>
+ * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+ * {@link KStreamBuilder#table(String)})
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+ * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
+ * be used
+ * @param topic the topic name
+ * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+ */
+ KTable<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String topic);
+
/**
* Materialize this changelog stream to a topic using default serializers and deserializers and producer's
@@ -647,11 +1118,8 @@ public interface KTable<K, V> {
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
/**
- * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
- * non-windowed left equi join.
+ * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
- * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
- * an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
@@ -660,17 +1128,13 @@ public interface KTable<K, V> {
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
- * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
- * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
- * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
- * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
- * null} to compute a value (with arbitrary type) for the result record.
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
+ * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
- * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
- * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
- * deleted).
+ * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
+ * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
@@ -688,7 +1152,7 @@ public interface KTable<K, V> {
* <td><K1:A></td>
* <td></td>
* <td></td>
- * <td><K1:ValueJoiner(A,null)></td>
+ * <td></td>
* </tr>
* <tr>
* <td></td>
@@ -698,18 +1162,99 @@ public interface KTable<K, V> {
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
+ * <td><K1:C></td>
+ * <td><K1:C></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(C,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
+ * <td><K1:null></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @param joinSerde serializer for join result value type
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}.
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key
+ * @see #leftJoin(KTable, ValueJoiner)
+ * @see #outerJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Serde<VR> joinSerde,
+ final String queryableStoreName);
+
+ /**
+ * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
+ * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
+ * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
* <td></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
* <td><K1:b></td>
- * <td><K1:null></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
+ * <td><K1:C></td>
+ * <td><K1:C></td>
* <td></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(C,b)></td>
+ * </tr>
+ * <tr>
* <td></td>
+ * <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
- * <td></td>
+ * <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
@@ -719,21 +1264,23 @@ public interface KTable<K, V> {
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains join-records for each key and values computed by the given
- * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
- * left {@code KTable}
- * @see #join(KTable, ValueJoiner)
+ * {@link ValueJoiner}, one for each matched record-pair with the same key
+ * @see #leftJoin(KTable, ValueJoiner)
* @see #outerJoin(KTable, ValueJoiner)
*/
- <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
- final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+ <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
- * non-windowed outer equi join.
+ * non-windowed left equi join.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
- * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
- * all records from both input {@code KTable}s will produce an output record (cf. below).
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+ * an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
@@ -744,14 +1291,15 @@ public interface KTable<K, V> {
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
- * Additionally, for each record that does not find a corresponding record in the corresponding other
- * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
- * corresponding other value to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+ * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+ * null} to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
- * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
- * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+ * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+ * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+ * deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
@@ -783,14 +1331,14 @@ public interface KTable<K, V> {
* <td></td>
* <td></td>
* <td><K1:b></td>
- * <td><K1:ValueJoiner(null,b)></td>
+ * <td><K1:null></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
- * <td><K1:null></td>
+ * <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
@@ -802,17 +1350,443 @@ public interface KTable<K, V> {
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
- * both {@code KTable}s
+ * left {@code KTable}
* @see #join(KTable, ValueJoiner)
- * @see #leftJoin(KTable, ValueJoiner)
+ * @see #outerJoin(KTable, ValueJoiner)
*/
- <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
- final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+ <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+
+ /**
+ * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+ * non-windowed left equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+ * an output record (cf. below).
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+ * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+ * null} to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+ * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+ * deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
+ * <td><K1:b></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
+ * </tr>
+ * <tr>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:null></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @param joinSerde serializer for join result value type
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#leftJoin(KTable, ValueJoiner)}.
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+ * left {@code KTable}
+ * @see #join(KTable, ValueJoiner)
+ * @see #outerJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Serde<VR> joinSerde,
+ final String queryableStoreName);
+
+ /**
+ * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+ * non-windowed left equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+ * an output record (cf. below).
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+ * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+ * null} to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+ * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+ * deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
+ * <td><K1:b></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
+ * </tr>
+ * <tr>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:null></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+ * left {@code KTable}
+ * @see #join(KTable, ValueJoiner)
+ * @see #outerJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+
+ /**
+ * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+ * non-windowed outer equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+ * all records from both input {@code KTable}s will produce an output record (cf. below).
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record that does not find a corresponding record in the corresponding other
+ * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+ * corresponding other value to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+ * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
+ * <td><K1:b></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
+ * </tr>
+ * <tr>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(null,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+ * both {@code KTable}s
+ * @see #join(KTable, ValueJoiner)
+ * @see #leftJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+
+ /**
+ * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+ * non-windowed outer equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+ * all records from both input {@code KTable}s will produce an output record (cf. below).
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record that does not find a corresponding record in the corresponding other
+ * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+ * corresponding other value to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+ * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
+ * <td><K1:b></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
+ * </tr>
+ * <tr>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(null,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @param joinSerde serializer for join result value type
+ * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+ * used to subsequently query the operation results; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+ * (i.e., that would be equivalent to calling {@link KTable#outerJoin(KTable, ValueJoiner)}.
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+ * both {@code KTable}s
+ * @see #join(KTable, ValueJoiner)
+ * @see #leftJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final Serde<VR> joinSerde,
+ final String queryableStoreName);
+
+ /**
+ * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+ * non-windowed outer equi join.
+ * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+ * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+ * all records from both input {@code KTable}s will produce an output record (cf. below).
+ * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+ * of the join.
+ * <p>
+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+ * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+ * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+ * {@code KTable} the result gets updated.
+ * <p>
+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * Additionally, for each record that does not find a corresponding record in the corresponding other
+ * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+ * corresponding other value to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * <p>
+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+ * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+ * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+ * <p>
+ * Input records with {@code null} key will be dropped and no join computation is performed.
+ * <p>
+ * Example:
+ * <table border='1'>
+ * <tr>
+ * <th>thisKTable</th>
+ * <th>thisState</th>
+ * <th>otherKTable</th>
+ * <th>otherState</th>
+ * <th>result update record</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K1:A></td>
+ * <td><K1:b></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(A,b)></td>
+ * </tr>
+ * <tr>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td></td>
+ * <td><K1:b></td>
+ * <td><K1:ValueJoiner(null,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * <td></td>
+ * <td><K1:null></td>
+ * </tr>
+ * </table>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+ * partitions.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param <VO> the value type of the other {@code KTable}
+ * @param <VR> the value type of the result {@code KTable}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@code KTable} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+ * both {@code KTable}s
+ * @see #join(KTable, ValueJoiner)
+ * @see #leftJoin(KTable, ValueJoiner)
+ */
+ <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+ final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Get the name of the local state store used for materializing this {@code KTable}.
+ * Get the name of the local state store used that can be used to query this {@code KTable}.
*
- * @return the underlying state store name, or {@code null} if this {@code KTable} is not materialized
+ * @return the underlying state store name, or {@code null} if this {@code KTable} cannot be queried.
*/
- String getStoreName();
+ String queryableStoreName();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index dce5d12..8aea44d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -47,6 +47,7 @@ public abstract class AbstractStream<K> {
this.sourceNodes = sourceNodes;
}
+
Set<String> ensureJoinableWith(final AbstractStream<K> other) {
Set<String> allSourceNodes = new HashSet<>();
allSourceNodes.addAll(sourceNodes);
@@ -57,6 +58,12 @@ public abstract class AbstractStream<K> {
return allSourceNodes;
}
+ String getOrCreateName(final String queryableStoreName, final String prefix) {
+ final String returnName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(prefix);
+ Topic.validate(returnName);
+ return returnName;
+ }
+
static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
return new ValueJoiner<T2, T1, R>() {
@Override