You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/29 19:21:51 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r432683735



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link Processor} instance inside the scope of the lambda expression.

Review comment:
       I think it's better to add this in the main java docs above and keep the parameter description short. What about adding a similar sentence as suggested for the web-docs into the JavaDocs? (Same for the DSL operator on `KStream`)
   
   Also, we should not talk about lambdas, as we don't know if lambdas are used or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, ?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       It might be best to extract this into a helper method and call directly when `addProcessor()` is called. That way the corresponding stack trace is easier to map to the actually call to `addProcessor()` that passed in incorrect supplier?
   
   Similarly, we should add this check to `KStreamImpl#process()` and others

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when forwarding to: " + key);

Review comment:
       Why do we add the `key` ? Does not seem to be useful and could potentially leak data into logs.
   
   We should also add a sentence about a potential root cause (even if we add other guards for it already in this PR):
   ```
   throw new StreamsException("Current node is unknown. This can happen if `forward()` is called in an illegal scope. The root cause could be that a `Processor` or `Transformer` instance is shared. To avoid this error, make sure that your suppliers return new instances each time `get()` is called and do not return the same object reference multiple times.");
   ```

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
                 indicating that the state store cannot be found. If the state store is not associated with the processor
                 in the <code class="docutils literal"><span class="pre">Topology</span></code> code, accessing it in the processor&#8217;s <code class="docutils literal"><span class="pre">init()</span></code> method will also throw an exception at
                 runtime, indicating the state store is not accessible from this processor.</p>
+            <p>Note that there could be multiple <code class="docutils literal"><span class="pre">ProcessorContext</span></code> instances initialize your <code class="docutils literal"><span class="pre">Processor</span></code> during initialization.

Review comment:
       I think the first sentence might be too detailed? Maybe we can simplify:
   ```
   <p>Note that the `process()` function takes a `ProcessorSupplier` as argument, and that the supplier pattern requires that a new `Processor` instance is return each time `get()` is called. Creating a single `Processor` object and returning the same object reference in `get()` would be violation of the supplier pattern and leads to runtime exceptions.</p>
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -2523,7 +2523,8 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
      * flatTransform()}.
      *
-     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a newly constructed
+     *                            {@link Transformer}

Review comment:
       This is nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org