You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/13 23:23:38 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.
+                    This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic &quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;Source&quot;</span><span class="o">,</span> <span class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); } public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } });</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic &quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same instance.</p>
+                    <p>In these topologies, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
+                        upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from
+                        Kafka to its downstream <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
+                        update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the
+                        <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> processor node to
+                        the Kafka topic <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the
+                        same store name <code class="docutils literal"><span class="pre">&quot;Counts&quot;</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime,
+                        indicating that the state store cannot be found. If the state store is not associated with the processor
+                        in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
+                        runtime, indicating the state store is not accessible from this processor.</p>
+                    <p>Now that you have fully defined your processor topology in your application, you can proceed to
+                        <a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p>
+                </div>

Review comment:
       Meta comment: can you also extend `streams/upgrade-guide.html` -- there is a section "public API" changes for the 2.6 release.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3164,10 +3164,17 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been connected to the processor, or if they are global stores.  While global stores do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li>
+                        <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>.  In this case there is no need to call <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>
+                            beforehand, the store will be automatically added for you.

Review comment:
       nit: `,` -> `;` ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.
+                    This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:

Review comment:
       nit: `in place` -> `instead` ?

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level KStream DSL,

Review comment:
       `KStream` -> `Kafka Streams`
   
   (`KStream` is not an appropriate abbreviation for Kafka Streams, because the DSL uses the abstraction of `KStream` and `KTable` and thus it would be ambiguous.)

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.
+                    This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic &quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;Source&quot;</span><span class="o">,</span> <span class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); } public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } });</span>

Review comment:
       Can we put this code (ie, the definition of `ProcessorSupplier`) into multiple lines?
   ```
   builder.addProcessor("Process", new ProcessorSupplier() {
       public Processor&lt;String, String&gt; get() {
           return new WordCountProcessor();
       }
       public Set&lt;StoreBuilder&gt; stores() { 
           return countStoreBuilder;
       }
   });
   ```
   
   Also note that in the original example above, there is an indention because method calls are chained:
   ```
   builder.source()
       .process()
       .addStateStore()
       .addSink();
   ```
   
   Either chain _all_ method calls, too, or chain none. If you don't chain, remove the indention. Currently it would render as follows (including an incorrect `;` after `addProcessor()`:
   ```
   builder.addSource();
       builder.addProcessor();
       .addSink();
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1915,43 +2061,66 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * can be observed and additional periodic actions can be performed.
      * Note that this is a terminal operation that returns void.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
-     * <pre>{@code
+     * In order for the processor to use state stores, the stores must be added to the topology and connected to the
+     * processor using at least one of two strategies (though it's not required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
+     * inputStream.process(new ProcessorSupplier() { public Processor get() { return new MyProcessor(); } }, "myProcessorState");
      * }</pre>
-     * Within the {@link Processor}, the state is obtained via the
+     * The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()},
+     * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
+     * <pre>{@code
+     * class MyProcessorSupplier implements ProcessorSupplier {
+     *     // supply processor
+     *     Processor get() {
+     *         return new MyProcessor();
+     *     }
+     *
+     *     // provide store(s) that will be added and connected to the associated processor
+     *     Set<StoreBuilder> stores() {
+     *         StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
+     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+     *                   Serdes.String(),
+     *                   Serdes.String());
+     *         return Collections.singleton(keyValueStoreBuilder);
+     *     }
+     * }
+     *
+     * ...
+     *
+     * inputStream.process(new MyProcessorSupplier());
+     * }</pre>
+     * <p>
+     * With either strategy, within the {@link Processor}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
      * <pre>{@code
-     * new ProcessorSupplier() {
-     *     Processor get() {
-     *         return new Processor() {
-     *             private StateStore state;
+     * class MyProcessor implements Processor {
+     *     private StateStore state;
      *
-     *             void init(ProcessorContext context) {
-     *                 this.state = context.getStateStore("myProcessorState");
-     *                 // punctuate each second, can access this.state
-     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *             }
+     *     void init(ProcessorContext context) {
+     *         this.state = context.getStateStore("myProcessorState");
+     *         // punctuate each second, can access this.state
+     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *     }
      *
-     *             void process(K key, V value) {
-     *                 // can access this.state
-     *             }
+     *     void process(K key, V value) {
+     *         // can access this.state
+     *     }
      *
-     *             void close() {
-     *                 // can access this.state
-     *             }
-     *         }
+     *     void close() {
+     *         // can access this.state

Review comment:
       `process()` has an overload below that needs and update, too. There are also some `transform()` variants you missed.
   
   Overall, it should be 14 methods that needs to be updated. I counted only 8 that this PR updates.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1162,53 +1186,77 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
      * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
+     * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState");
+     * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");

Review comment:
       as above (more below; not add comments again)

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.
+                    This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic &quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;Source&quot;</span><span class="o">,</span> <span class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); } public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } });</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic &quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same instance.</p>

Review comment:
       nit: highlight `instance`

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.

Review comment:
       Not sure what this means: `it is connected to to the topology.`

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level KStream DSL,
+ * how to implement the WordCount program that computes a simple word occurrence histogram from an input text.
+ * <p>
+ * <strong>Note: This is simplified code that only works correctly for single partition input topics.
+ * Check out {@link WordCountDemo} for a generic example.</strong>
+ * <p>
+ * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ * <p>
+ * This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} and {@link ConnectedStoreProvider}
+ * to define the application topology through a {@link StreamsBuilder}, which more closely resembles the high-level DSL.
+ * <p>
+ * Before running this example you must create the input topic and the output topic (e.g. via
+ * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via
+ * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
+ */
+public final class WordCountTransformerDemo {
+
+    static class MyTransformerSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
+
+        @Override
+        public Transformer<String, String, KeyValue<String, String>> get() {
+            return new Transformer<String, String, KeyValue<String, String>>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(final ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
+                        try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
+                            System.out.println("----------- " + timestamp + " ----------- ");
+
+                            while (iter.hasNext()) {
+                                final KeyValue<String, Integer> entry = iter.next();
+
+                                System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                                context.forward(entry.key, entry.value.toString());
+                            }
+                        }
+                    });
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String dummy, final String line) {
+                    final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
+
+                    for (final String word : words) {
+                        final Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();

Review comment:
       I think we should remove `context.commit();` (Can we also remove it in `WordCountProcessorDemo`?)

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
##########
@@ -329,10 +329,11 @@ public void testAddStateStoreWithSink() {
     }
 
     @Test
-    public void testAddStateStoreWithDuplicates() {
+    public void testAddStateStoreWithDifferentInstances() {

Review comment:
       Can we improve the test name -> `shouldNotAllowToAddStoresWithSameName` (or similar)
   
   We should also add a test that verifies that adding the same `StoreBuilder` instance multiple times works.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -953,7 +977,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * flatTransform()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
-     * @param stateStoreNames     the names of the state stores used by the processor
+     * @param stateStoreNames     the names of the state stores used by the transformer, passed only if {@link ConnectedStoreProvider#stores()} is null

Review comment:
       Why this:
   `, passed only if {@link ConnectedStoreProvider#stores()} is null` 
   
   From the KIP: 
   > A user may continue to "connect" stores to a processor by passing stateStoreNames when calling stream.process/transform(...) .  This may be used in combination with a Supplier  that provides its own state stores by implementing ConnectedStoreProvider::stores() .

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level KStream DSL,
+ * how to implement the WordCount program that computes a simple word occurrence histogram from an input text.
+ * <p>
+ * <strong>Note: This is simplified code that only works correctly for single partition input topics.
+ * Check out {@link WordCountDemo} for a generic example.</strong>
+ * <p>
+ * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ * <p>
+ * This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} and {@link ConnectedStoreProvider}

Review comment:
       `uses a {@link Transformer} and {@link ConnectedStoreProvider}` -- this might be confusing because `TransformerSupplier extends ConnectedStoreProvider` and the code does not contain any direct reference to `ConnectedStoreProvider`... Can we rephrase it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
-     * <pre>{@code

Review comment:
       This line must be kept to start the code example

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
     <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> with the pre-defined <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> logic is then added as the downstream
+                        processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node using the <code class="docutils literal"><span class="pre">addProcessor</span></code> method.</li>
+                    <li>A predefined persistent key-value state store is added and connected to the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology.
+                    This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic &quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;Source&quot;</span><span class="o">,</span> <span class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); } public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } });</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic &quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same instance.</p>
+                    <p>In these topologies, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
+                        upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from
+                        Kafka to its downstream <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
+                        update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the
+                        <code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> processor node to
+                        the Kafka topic <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the
+                        same store name <code class="docutils literal"><span class="pre">&quot;Counts&quot;</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime,
+                        indicating that the state store cannot be found. If the state store is not associated with the processor
+                        in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
+                        runtime, indicating the state store is not accessible from this processor.</p>
+                    <p>Now that you have fully defined your processor topology in your application, you can proceed to
+                        <a class="reference internal" href="running-app.html#streams-developer-guide-execution"><span class="std std-ref">running the Kafka Streams application</span></a>.</p>
+                </div>
             </div>
-            <p>Here is a quick explanation of this example:</p>

Review comment:
       I seems you did not really change anything here? (Only the indention? Why? -- Make it hard to review.)

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -291,13 +291,19 @@ private void mockStoreBuilder() {
     }
 
     @Test
-    public void shouldNotAllowToAddStoreWithSameName() {
+    public void shouldNotAllowToAddStoreWithSameNameAndDifferentInstance() {

Review comment:
       We should add a new test `shouldAllowToShareStoreUsingSameStoreBuilder`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
-     * <pre>{@code
+     * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
+     * KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");

Review comment:
       Should be multiple lines to improve readabilty.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
-     * <pre>{@code
+     * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
+     * KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");
      * }</pre>
-     * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+     * The second strategy is for the given {@link TransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
+     * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
+     * <pre>{@code
+     * class MyTransformerSupplier implements TransformerSupplier {
+     *     // supply transformer
+     *     Transformer get() {
+     *         return new MyTransformer();
+     *     }
+     *
+     *     // provide store(s) that will be added and connected to the associated transformer

Review comment:
       Should we add a comment to highlight that the store name `"myTransformState"` from the builder is used to access the store via the `context` later?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1231,7 +1279,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
      * @param named               a {@link Named} config used to name the processor in the topology
-     * @param stateStoreNames     the names of the state stores used by the processor
+     * @param stateStoreNames     the names of the state stores used by the transformer, passed only if {@link ConnectedStoreProvider#stores()} is null

Review comment:
       As above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -723,6 +775,20 @@ public void process(final String key, final String value) {
         return () -> processor;
     }
 
+    private <K, V> ProcessorSupplier<K, V> defineWithStores(final Processor<K, V> processor, final Set<StoreBuilder> stores) {
+        return new ProcessorSupplier<K, V>() {
+            @Override
+            public Processor<K, V> get() {
+                return processor;

Review comment:
       This does not seem to be a good implementation? A supplier is supposed to return a new instance on each `get()` call. It might not be an issue now, but might be error prone in the future. 
   
   Maybe accept an supplier as input instead of a `Processor`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingConnectedStateStoreTopology() {
+        driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4"));
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+        final KeyValueStore<String, String> store = driver.getKeyValueStore("connectedStore");
+        assertEquals("value4", store.get("key1"));
+        assertEquals("value2", store.get("key2"));
+        assertEquals("value3", store.get("key3"));
+        assertNull(store.get("key4"));
+    }
+
+    @Test
+    public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
+        final String storeName = "connectedStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
+            .addProcessor("processor1", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor2", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")

Review comment:
       The same `StatefulProcessor` instance would be used in each processor step -- I would expect this to be problematic (cf. my comment below on `defineWithStores()`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
                 name,
                 new ProcessorParameters<>(processorSupplier, name),
-                stateStoreNames
+                getStoreNamesAndMaybeAddStores(processorSupplier, stateStoreNames)
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
+    /**
+     * Provides store names that should be connected to a {@link StatefulProcessorNode}, from two sources:
+     * 1) Store names are provided as arguments to process(...), transform(...), etc.
+     * 2) {@link StoreBuilder}s are provided by the Processor/TransformerSupplier itself, by returning a set from
+     * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s will also be added to the topology.
+     */
+    private String[] getStoreNamesAndMaybeAddStores(final ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
       Why not make `varargsStoreNames` a vararg? The `null` check below is not required then?
   
   Why do we actually "unwrap" the stores here and add them one-by-one to the builder/topology?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -549,6 +593,14 @@ private Topology createStatefulTopology(final String storeName) {
             .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
+    private Topology createConnectedStateStoreTopology(final String storeName) {
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());

Review comment:
       nit: line too long

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
##########
@@ -55,61 +60,92 @@
     private final String topic = "stream";
     private final String stateStoreName = "myTransformState";
     private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
-    private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value));
+    private final ForeachAction<Integer, Integer> accumulateExpected = (key, value) -> results.add(KeyValue.pair(key, value));
     private KStream<Integer, Integer> stream;
 
     @Before
     public void before() {
         builder = new StreamsBuilder();
-        final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
-                Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
-                                            Serdes.Integer(),
-                                            Serdes.Integer());
-        builder.addStateStore(keyValueStoreBuilder);
         stream = builder.stream(topic, Consumed.with(Serdes.Integer(), Serdes.Integer()));
     }
 
+    private StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder() {
+        return Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.Integer());
+    }
+
     private void verifyResult(final List<KeyValue<Integer, Integer>> expected) {
         final ConsumerRecordFactory<Integer, Integer> recordFactory =
             new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
         final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic, Arrays.asList(new KeyValue<>(1, 1),
-                                                                       new KeyValue<>(2, 2),
-                                                                       new KeyValue<>(3, 3),
-                                                                       new KeyValue<>(2, 1),
-                                                                       new KeyValue<>(2, 3),
-                                                                       new KeyValue<>(1, 3))));
+                new KeyValue<>(2, 2),

Review comment:
       If you change the indention, move the first `new KeyValue(1, 1)` to it's own line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org