You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:16:00 UTC

[3/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for queryable stores

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 290142b..e6219c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -24,7 +24,10 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 /**
@@ -47,7 +50,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  *     final KafkaStreams streams = ...;
  *     streams.start()
  *     ...
- *     final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable
+ *     final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
  *     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
  *     view.get(key);
  *}</pre>
@@ -87,6 +90,79 @@ public interface KTable<K, V> {
     KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
 
     /**
+     * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+     * predicate.
+     * All records that do not satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+     * is forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     *                          used to subsequently query the operation results; valid characters are ASCII
+     *                          alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     *                          (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
+     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     * @see #filterNot(Predicate)
+     */
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
+
+    /**
+     * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+     * predicate.
+     * All records that do not satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+     * is forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@code KTable} that contains only those records that satisfy the given predicate
+     * @see #filterNot(Predicate)
+     */
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
      * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
      * given predicate.
      * All records that <em>do</em> satisfy the predicate are dropped.
@@ -109,6 +185,78 @@ public interface KTable<K, V> {
     KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
 
     /**
+     * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+     * given predicate.
+     * All records that <em>do</em> satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+     * forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+     * @see #filter(Predicate)
+     */
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
+     * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
+     * given predicate.
+     * All records that <em>do</em> satisfy the predicate are dropped.
+     * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@code KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+     * forwarded.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     * used to subsequently query the operation results; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
+     * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+     * @see #filter(Predicate)
+     */
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
+
+
+    /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
      * (with possible new type)in the new {@code KTable}.
      * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
@@ -144,6 +292,97 @@ public interface KTable<K, V> {
 
 
     /**
+     * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+     * (with possible new type)in the new {@code KTable}.
+     * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+     * computes a new value for it, resulting in an update record for the result {@code KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of the value string.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * <p>
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@code KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@code KTable}.
+     *
+     * @param mapper a {@link ValueMapper} that computes a new output value
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     * used to subsequently query the operation results; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     * (i.e., that would be equivalent to calling {@link KTable#mapValues(ValueMapper)}.
+     * @param valueSerde serializer for new value type
+     * @param <VR>   the value type of the result {@code KTable}
+     *
+     * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     */
+    <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
+
+    /**
+     * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+     * (with possible new type)in the new {@code KTable}.
+     * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+     * computes a new value for it, resulting in an update record for the result {@code KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of the value string.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * <p>
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@code KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@code KTable}.
+     *
+     * @param mapper a {@link ValueMapper} that computes a new output value
+     * @param valueSerde serializer for new value type
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @param <VR>   the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     */
+    <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+                                 final Serde<VR> valueSerde,
+                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+
+    /**
      * Print the update records of this {@code KTable} to {@code System.out}.
      * This function will use the generated name of the parent processor node to label the key/value pairs printed to
      * the console.
@@ -156,7 +395,11 @@ public interface KTable<K, V> {
      * <p>
      * Note that {@code print()} is not applied to the internal state store and only called for each new {@code KTable}
      * update record.
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#print()} on the result.
      */
+    @Deprecated
     void print();
 
     /**
@@ -173,7 +416,11 @@ public interface KTable<K, V> {
      * update record.
      *
      * @param streamName the name used to label the key/value pairs printed to the console
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)} on the result.
      */
+    @Deprecated
     void print(final String streamName);
 
     /**
@@ -191,8 +438,12 @@ public interface KTable<K, V> {
      * update record.
      *
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde)} on the result.
      */
+    @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde);
 
@@ -212,7 +463,11 @@ public interface KTable<K, V> {
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
      * @param streamName the name used to label the key/value pairs printed to the console
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde, Serde, String)} on the result.
      */
+    @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
                final String streamName);
@@ -232,7 +487,11 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param filePath name of file to write to
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String)}} on the result.
      */
+    @Deprecated
     void writeAsText(final String filePath);
 
     /**
@@ -250,7 +509,11 @@ public interface KTable<K, V> {
      *
      * @param filePath   name of file to write to
      * @param streamName the name used to label the key/value pairs printed out to the console
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String)}} on the result.
      */
+    @Deprecated
     void writeAsText(final String filePath,
                      final String streamName);
 
@@ -270,8 +533,12 @@ public interface KTable<K, V> {
      *
      * @param filePath name of file to write to
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, Serde, Serde)}} on the result.
      */
+    @Deprecated
     void  writeAsText(final String filePath,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde);
@@ -292,8 +559,13 @@ public interface KTable<K, V> {
      * @param filePath name of file to write to
      * @param streamName the name used to label the key/value pairs printed to the console
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
-     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]}
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String, String, Serde, Serde)}} on the result.
+
      */
+    @Deprecated
     void writeAsText(final String filePath,
                      final String streamName,
                      final Serde<K> keySerde,
@@ -307,7 +579,11 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param action an action to perform on each record
+     * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String, QueryableStoreType) }
+     * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable. Alternatively
+     * convert to a KStream using {@code toStream()} and then use {@link KStream#foreach(ForeachAction)}} on the result.
      */
+    @Deprecated
     void foreach(final ForeachAction<? super K, ? super V> action);
 
     /**
@@ -361,12 +637,94 @@ public interface KTable<K, V> {
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
-     * @param storeName the state store name used for the result {@code KTable}; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII
+     *                  alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()}
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final String topic,
+                         final String queryableStoreName);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+     * serializers and deserializers and producer's {@link DefaultPartitioner}.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+     *
+     * @param topic     the topic name
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
      */
     KTable<K, V> through(final String topic,
-                         final String storeName);
+                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+     * serializers and deserializers and producer's {@link DefaultPartitioner}.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+     * {@link KStreamBuilder#table(String)})
+     *
+     * @param topic     the topic name
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final String topic);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+     * records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+     * {@link KStreamBuilder#table(String)})
+     *
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified producer's {@link DefaultPartitioner} will be used
+     * @param topic       the topic name
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
+     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+     * records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified producer's {@link DefaultPartitioner} will be used
+     * @param topic       the topic name
+     * @param queryableStoreName   the state store name used for the result {@code KTable}.
+     *                             If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)}
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic,
+                         final String queryableStoreName);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default
@@ -384,12 +742,12 @@ public interface KTable<K, V> {
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic       the topic name
-     * @param storeName   the state store name used for the result {@code KTable}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
      */
     KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final String storeName);
+                         final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
      * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
@@ -410,43 +768,156 @@ public interface KTable<K, V> {
      * @param valSerde  value serde used to send key-value pairs,
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name
-     * @param storeName the state store name used for the result {@code KTable}
+     * @param queryableStoreName the state store name used for the result {@code KTable}.
+     *                           If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()}
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
      */
     KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
                          final String topic,
-                         final String storeName);
+                         final String queryableStoreName);
 
     /**
-     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
-     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
      * started).
      * <p>
-     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
-     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
      * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
      * {@link KStreamBuilder#table(String, String)})
      *
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
-     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+                         final String topic,
+                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
+     * {@link KStreamBuilder#table(String)})
+     *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+                         final String topic);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
+     *                    be used
+     * @param topic      the topic name
+     * @param queryableStoreName  the state store name used for the result {@code KTable}.
+     *                            If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()}
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
+                         final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic,
+                         final String queryableStoreName);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
      *                    be used
      * @param topic      the topic name
-     * @param storeName  the state store name used for the result {@code KTable}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
      */
     KTable<K, V> through(final Serde<K> keySerde,
                          final Serde<V> valSerde,
                          final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic,
-                         final String storeName);
+                         final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    /**
+     * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+     * <p>
+     * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
+     * {@link KStreamBuilder#table(String)})
+     *
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
+     *                    be used
+     * @param topic      the topic name
+     * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
+     */
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
+                         final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic);
+
 
     /**
      * Materialize this changelog stream to a topic using default serializers and deserializers and producer's
@@ -647,11 +1118,8 @@ public interface KTable<K, V> {
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
 
     /**
-     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
-     * non-windowed left equi join.
+     * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join.
      * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
-     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
-     * an output record (cf. below).
      * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
      * of the join.
      * <p>
@@ -660,17 +1128,13 @@ public interface KTable<K, V> {
      * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
      * {@code KTable} the result gets updated.
      * <p>
-     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
-     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
-     * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
-     * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
-     * null} to compute a value (with arbitrary type) for the result record.
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
      * The key of the result record is the same as for both joining input records.
      * <p>
      * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
-     * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
-     * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
-     * deleted).
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
+     * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
      * <p>
      * Input records with {@code null} key will be dropped and no join computation is performed.
      * <p>
@@ -688,7 +1152,7 @@ public interface KTable<K, V> {
      * <td>&lt;K1:A&gt;</td>
      * <td></td>
      * <td></td>
-     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * <td></td>
      * </tr>
      * <tr>
      * <td></td>
@@ -698,18 +1162,99 @@ public interface KTable<K, V> {
      * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
      * </tr>
      * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:C&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:C&gt;</td>
      * <td>&lt;K1:null&gt;</td>
      * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @param joinSerde serializer for join result value type
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     * used to subsequently query the operation results; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}.
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key
+     * @see #leftJoin(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                final Serde<VR> joinSerde,
+                                final String queryableStoreName);
+
+    /**
+     * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
+     * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
      * <td></td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
      * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:null&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
      * </tr>
      * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:C&gt;</td>
      * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * <tr>
      * <td></td>
+     * <td>&lt;K1:C&gt;</td>
      * <td>&lt;K1:null&gt;</td>
      * <td></td>
-     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
      * </tr>
      * </table>
      * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
@@ -719,21 +1264,23 @@ public interface KTable<K, V> {
      * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
      * @param <VO>   the value type of the other {@code KTable}
      * @param <VR>   the value type of the result {@code KTable}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@code KTable} that contains join-records for each key and values computed by the given
-     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
-     * left {@code KTable}
-     * @see #join(KTable, ValueJoiner)
+     * {@link ValueJoiner}, one for each matched record-pair with the same key
+     * @see #leftJoin(KTable, ValueJoiner)
      * @see #outerJoin(KTable, ValueJoiner)
      */
-    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
-                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+    <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                final StateStoreSupplier<KeyValueStore> storeSupplier);
+
 
     /**
      * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
-     * non-windowed outer equi join.
+     * non-windowed left equi join.
      * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
-     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
-     * all records from both input {@code KTable}s will produce an output record (cf. below).
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+     * an output record (cf. below).
      * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
      * of the join.
      * <p>
@@ -744,14 +1291,15 @@ public interface KTable<K, V> {
      * <p>
      * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
      * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
-     * Additionally, for each record that does not find a corresponding record in the corresponding other
-     * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
-     * corresponding other value to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+     * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+     * null} to compute a value (with arbitrary type) for the result record.
      * The key of the result record is the same as for both joining input records.
      * <p>
      * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
-     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
-     * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+     * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+     * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+     * deleted).
      * <p>
      * Input records with {@code null} key will be dropped and no join computation is performed.
      * <p>
@@ -783,14 +1331,14 @@ public interface KTable<K, V> {
      * <td></td>
      * <td></td>
      * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
      * </tr>
      * <tr>
      * <td></td>
      * <td></td>
      * <td>&lt;K1:null&gt;</td>
      * <td></td>
-     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
      * </tr>
      * </table>
      * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
@@ -802,17 +1350,443 @@ public interface KTable<K, V> {
      * @param <VR>   the value type of the result {@code KTable}
      * @return a {@code KTable} that contains join-records for each key and values computed by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
-     * both {@code KTable}s
+     * left {@code KTable}
      * @see #join(KTable, ValueJoiner)
-     * @see #leftJoin(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
      */
-    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
-                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+     * non-windowed left equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+     * an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+     * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+     * null} to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+     * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+     * deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @param joinSerde serializer for join result value type
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     * used to subsequently query the operation results; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     * (i.e., that would be equivalent to calling {@link KTable#leftJoin(KTable, ValueJoiner)}.
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * left {@code KTable}
+     * @see #join(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                    final Serde<VR> joinSerde,
+                                    final String queryableStoreName);
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+     * non-windowed left equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
+     * an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
+     * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+     * null} to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+     * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
+     * deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * left {@code KTable}
+     * @see #join(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                    final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+     * non-windowed outer equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+     * all records from both input {@code KTable}s will produce an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record that does not find a corresponding record in the corresponding other
+     * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+     * corresponding other value to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+     * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * both {@code KTable}s
+     * @see #join(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+     * non-windowed outer equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+     * all records from both input {@code KTable}s will produce an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record that does not find a corresponding record in the corresponding other
+     * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+     * corresponding other value to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+     * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @param joinSerde serializer for join result value type
+     * @param queryableStoreName a user-provided name of the underlying {@link KTable} that can be
+     * used to subsequently query the operation results; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
+     * (i.e., that would be equivalent to calling {@link KTable#outerJoin(KTable, ValueJoiner)}.
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * both {@code KTable}s
+     * @see #join(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                     final Serde<VR> joinSerde,
+                                     final String queryableStoreName);
+
+    /**
+     * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
+     * non-windowed outer equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+     * all records from both input {@code KTable}s will produce an output record (cf. below).
+     * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@code KTable} the result gets updated.
+     * <p>
+     * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record that does not find a corresponding record in the corresponding other
+     * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+     * corresponding other value to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+     * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
+     * <p>
+     * Input records with {@code null} key will be dropped and no join computation is performed.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
+     * partitions.
+     *
+     * @param other  the other {@code KTable} to be joined with this {@code KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@code KTable}
+     * @param <VR>   the value type of the result {@code KTable}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@code KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * both {@code KTable}s
+     * @see #join(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner)
+     */
+    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Get the name of the local state store used for materializing this {@code KTable}.
+     * Get the name of the local state store used that can be used to query this {@code KTable}.
      *
-     * @return the underlying state store name, or {@code null} if this {@code KTable} is not materialized
+     * @return the underlying state store name, or {@code null} if this {@code KTable} cannot be queried.
      */
-    String getStoreName();
+    String queryableStoreName();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index dce5d12..8aea44d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -47,6 +47,7 @@ public abstract class AbstractStream<K> {
         this.sourceNodes = sourceNodes;
     }
 
+
     Set<String> ensureJoinableWith(final AbstractStream<K> other) {
         Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(sourceNodes);
@@ -57,6 +58,12 @@ public abstract class AbstractStream<K> {
         return allSourceNodes;
     }
 
+    String getOrCreateName(final String queryableStoreName, final String prefix) {
+        final String returnName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(prefix);
+        Topic.validate(returnName);
+        return returnName;
+    }
+
     static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
         return new ValueJoiner<T2, T1, R>() {
             @Override