You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/29 04:50:55 UTC

[GitHub] [kafka] sneakyburro opened a new pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

sneakyburro opened a new pull request #8752:
URL: https://github.com/apache/kafka/pull/8752


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)
   User provided ProcessorSupplier may have change to always return the same instance. Multiple ProcessorContext tried to register in a same processor instance, but processor instance could only have one ProcessorContext to callback. So the time StreamThread who owns each ProcessorContext dispatch records to processor instance and callback somewhere else(not its thread local ProcessorContext). And it led to inconsistency.
   Wrong samples here:
   ```
   .addProcessor("Process", () -> fileExtractProcessorObject , "sourceProcessor")
   ```
   
   So this commit 
   * check violation of our **Supplier** pattern during initialization. 
   * improve java docs
   * add a section in official docucment to explicitly remind users
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   Unit tests should remains same behaviors as before except for wrong usages.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438381511



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -645,14 +645,17 @@ public synchronized Topology addSink(final String name,
      * Add a new processor node that receives and processes records output by one or more parent source or processor
      * node.
      * Any new record output by this processor will be forwarded to its child processor or sink nodes.
+     * The supplier should always generate a new instance each time invoking {@link  ProcessorSupplier#get()}. Creating
+     * a single Processor object and returning the same object reference in {@link ProcessorSupplier#get()} would be
+     * a violation of the supplier pattern and leads to runtime exceptions.

Review comment:
       Can we add this to all other methods, too? (ie, all that take a supplier, Processor or Transformer?)
   
   And maybe also to `ProcessorSupplier#get()` and `TransformerSupplier#get()` -- I think it would be best to provide redundancy :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sneakyburro commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
sneakyburro commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642337042


   > Failed with checkstyle error:
   > 
   > ```
   > Task :rat FAILED
   > 16:15:25 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamUtil.java
   > 16:15:25 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyUtil.java
   > ```
   > 
   > You need to add the license header to the new files.
   
   @mjsax I'll finish test cases and fix errors tonight.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sneakyburro commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
sneakyburro commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438391769



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -1421,6 +1422,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
+        TopologyUtil.checkProcessorSupplier(processorSupplier);

Review comment:
       That's a good idea. Make the change now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438420494



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -2893,6 +2904,9 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
      * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
      * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * The supplier should always generate a new instance each time invoking {@link TransformerSupplier#get()}. Creating
+     * a single Transformer object and returning the same object reference in {@link TransformerSupplier#get()} would be
+     * a violation of the supplier pattern and leads to runtime exceptions.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}

Review comment:
       ```suggestion
        * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a newly constructed {@link Transformer}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438384667



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -1421,6 +1422,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
+        TopologyUtil.checkProcessorSupplier(processorSupplier);

Review comment:
       We should add a similar check for `TransformerSuppliers` -- This check must be done in the DSL code though, as `Transformers` are a higher level concept. I checked the code and all passed transformers through https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1251 -- so we can add the check there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642305909






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438379682



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,11 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
                 indicating that the state store cannot be found. If the state store is not associated with the processor
                 in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
                 runtime, indicating the state store is not accessible from this processor.</p>
+            <p>Note that the <code class="docutils literal"><span class="pre">Topology#addProcessor</span></code> function takes a <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> as argument, and that the supplier pattern requires that a new
+                <code class="docutils literal"><span class="pre">Processor</span></code> instance is return each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> is called. Creating a single <code class="docutils literal"><span class="pre">Processor</span></code>

Review comment:
       ```suggestion
                   <code class="docutils literal"><span class="pre">Processor</span></code> instance is returned each time <code class="docutils literal"><span class="pre">ProcessorSupplier#get()</span></code> is called. Creating a single <code class="docutils literal"><span class="pre">Processor</span></code>
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642307843


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax closed pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax closed pull request #8752:
URL: https://github.com/apache/kafka/pull/8752


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438420782



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -3914,7 +3928,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
      * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
      *
-     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor}

Review comment:
       Add the paragraph also here (ie, to the JavaDocs above)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -4005,7 +4019,7 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
      * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
      *
-     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor}

Review comment:
       Add the paragraph also here (ie, to the JavaDocs above)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r432683735



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link Processor} instance inside the scope of the lambda expression.

Review comment:
       I think it's better to add this in the main java docs above and keep the parameter description short. What about adding a similar sentence as suggested for the web-docs into the JavaDocs? (Same for the DSL operator on `KStream`)
   
   Also, we should not talk about lambdas, as we don't know if lambdas are used or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, ?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       It might be best to extract this into a helper method and call directly when `addProcessor()` is called. That way the corresponding stack trace is easier to map to the actually call to `addProcessor()` that passed in incorrect supplier?
   
   Similarly, we should add this check to `KStreamImpl#process()` and others

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when forwarding to: " + key);

Review comment:
       Why do we add the `key` ? Does not seem to be useful and could potentially leak data into logs.
   
   We should also add a sentence about a potential root cause (even if we add other guards for it already in this PR):
   ```
   throw new StreamsException("Current node is unknown. This can happen if `forward()` is called in an illegal scope. The root cause could be that a `Processor` or `Transformer` instance is shared. To avoid this error, make sure that your suppliers return new instances each time `get()` is called and do not return the same object reference multiple times.");
   ```

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
                 indicating that the state store cannot be found. If the state store is not associated with the processor
                 in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
                 runtime, indicating the state store is not accessible from this processor.</p>
+            <p>Note that there could be multiple <code class="docutils literal"><span class="pre">ProcessorContext</span></code> instances initialize your <code class="docutils literal"><span class="pre">Processor</span></code> during initialization.

Review comment:
       I think the first sentence might be too detailed? Maybe we can simplify:
   ```
   <p>Note that the `process()` function takes a `ProcessorSupplier` as argument, and that the supplier pattern requires that a new `Processor` instance is return each time `get()` is called. Creating a single `Processor` object and returning the same object reference in `get()` would be violation of the supplier pattern and leads to runtime exceptions.</p>
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -2523,7 +2523,8 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
      * flatTransform()}.
      *
-     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a newly constructed
+     *                            {@link Transformer}

Review comment:
       This is nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438420782



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -3914,7 +3928,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
      * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
      *
-     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor}

Review comment:
       Add the paragraph also here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sneakyburro commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
sneakyburro commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-640936812


   > @sneakyburro -- Just a heads up: code freeze for 2.6 release in on Wednesday. If you want to get the PR into the release, please address the review comments soon. Otherwise, this will slip and go into 2.7.
   
   I'll update the PR tonight.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-640935840


   @sneakyburro -- Just a heads up: code freeze for 2.6 release in on Wednesday. If you want to get the PR into the release, please address the review comments soon. Otherwise, this will slip and go into 2.7.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642305845


   Retest this pleasel


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642306024


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-668326105


   Closing this PR in favor of #9000 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r438420374



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -2770,6 +2778,9 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
      * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
      * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * The supplier should always generate a new instance each time invoking {@link TransformerSupplier#get()}. Creating
+     * a single Transformer object and returning the same object reference in {@link TransformerSupplier#get()} would be
+     * a violation of the supplier pattern and leads to runtime exceptions.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}

Review comment:
       ```suggestion
        * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a newly constructed {@link Transformer}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sneakyburro commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
sneakyburro commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r437317697



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, ?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       That's a great idea. I introduced a util class `TopologyUtil` under processor package.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
                 indicating that the state store cannot be found. If the state store is not associated with the processor
                 in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
                 runtime, indicating the state store is not accessible from this processor.</p>
+            <p>Note that there could be multiple <code class="docutils literal"><span class="pre">ProcessorContext</span></code> instances initialize your <code class="docutils literal"><span class="pre">Processor</span></code> during initialization.

Review comment:
       Adopted and revised a little. Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link Processor} instance inside the scope of the lambda expression.

Review comment:
       I reverted java docs on parameter and added web-docs-like docs on main java docs for this function. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when forwarding to: " + key);

Review comment:
       Made the change!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax removed a comment on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax removed a comment on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642305845


   Retest this pleasel


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-646367241


   @sneakyburro Any update on this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-642327369


   Failed with checkstyle error:
   ```
   Task :rat FAILED
   16:15:25 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamUtil.java
   16:15:25 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyUtil.java
   ```
   
   You need to add the license header to the new files.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org