You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:02:50 UTC

[GitHub] [kafka] sneakyburro commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

sneakyburro commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r437317697



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, ?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       That's a great idea. I introduced a util class `TopologyUtil` under processor package.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
                 indicating that the state store cannot be found. If the state store is not associated with the processor
                 in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
                 runtime, indicating the state store is not accessible from this processor.</p>
+            <p>Note that there could be multiple <code class="docutils literal"><span class="pre">ProcessorContext</span></code> instances initialize your <code class="docutils literal"><span class="pre">Processor</span></code> during initialization.

Review comment:
       Adopted and revised a little. Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link Processor} instance inside the scope of the lambda expression.

Review comment:
       I reverted java docs on parameter and added web-docs-like docs on main java docs for this function. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when forwarding to: " + key);

Review comment:
       Made the change!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org