You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/07 22:17:54 UTC

[GitHub] [kafka] vvcephei opened a new pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

vvcephei opened a new pull request #10994:
URL: https://github.com/apache/kafka/pull/10994


   Since the old Processor API is now deprecated, we need to
   update the documentation to steer people to the new API.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668841380



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>

Review comment:
       ```suggestion
               methods handle records in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668331269



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties

Review comment:
       `properties` -> `fields` (or `elements`) ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:

Review comment:
       `key` -> `main` (to avoid confusion with "record key") ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       `stream time` -> `stream-time` 

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       You can also create a punctuation within `process()`.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {

Review comment:
       `->` -> `-&gt;` ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>

Review comment:
       `precords` -> `records`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r665744745



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>

Review comment:
       The whole time handling situation is slightly different now that there's no implicit timestamp inheritance during `process()`. I just reframed this whole section to first document the role of the input and output type bounds, to second document the new Record class, and finally to slightly reframe the docs about what happens during punctuation.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
                 <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, Void&gt; {

Review comment:
       Adding the forward type bound.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       I happened to notice that the old docs here were outdated. Streams no longer plays those games with stream time. It's just computed as the max timestamp of any record processed by the task. But the key point still stands, that people should be aware that stream time doesn't advance unless we process records.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
                 <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, Void&gt; {
 
   private final String emailAddress;
-  private ProcessorContext context;
+  private ProcessorContext&lt;Void, Void&gt; context;
 
   public PopularPageEmailAlert(String emailAddress) {
     this.emailAddress = emailAddress;
   }
 
   @Override
-  public void init(ProcessorContext context) {
+  public void init(ProcessorContext&lt;Void, Void&gt; context) {
     this.context = context;
 
     // Here you would perform any additional initializations such as setting up an email client.
   }
 
   @Override
-  void process(PageId pageId, Long count) {
+  void process(Record&lt;PageId, Long&gt; record) {
     // Here you would format and send the alert email.
     //
-    // In this specific example, you would be able to include information about the page&#39;s ID and its view count
-    // (because the class implements `Processor&lt;PageId, Long&gt;`).
+    // In this specific example, you would be able to include
+    // information about the page&#39;s ID and its view count
   }
 
   @Override
   void close() {
-    // Any code for clean up would go here.  This processor instance will not be used again after this call.
+    // Any code for clean up would go here, for example tearing down the email client and anything
+    // else you created in the init() method
+    // This processor instance will not be used again after this call.

Review comment:
       Expanded on this point a little, since we specifically said we might create the email client during init.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -428,12 +463,20 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a
 builder.addSource("Source", "source-topic")
     // add the WordCountProcessor node which takes the source processor as its upstream processor.
     // the ProcessorSupplier provides the count store associated with the WordCountProcessor
-    .addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
-        public Processor&ltString, String&gt get() {
+    .addProcessor("Process", new ProcessorSupplier&lt;String, String, String, String&gt;() {
+        public Processor&lt;String, String, String, String&gt; get() {
             return new WordCountProcessor();
         }
-        public Set&ltStoreBuilder&lt?&gt&gt stores() {
-            return countStoreBuilder;
+
+        public Set&lt;StoreBuilder&lt;?&gt;&gt; stores() {
+            final StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; countsStoreBuilder =
+                Stores
+                    .keyValueStoreBuilder(
+                        Stores.persistentKeyValueStore("Counts"),
+                        Serdes.String(),
+                        Serdes.Long()
+                    );
+            return Collections.singleton(countsStoreBuilder);

Review comment:
       For this example, it seems more appropriate to depict a self-contained store definition, rather than referencing an externally defined store builder, as in the other example.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
                 <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, Void&gt; {
 
   private final String emailAddress;
-  private ProcessorContext context;
+  private ProcessorContext&lt;Void, Void&gt; context;
 
   public PopularPageEmailAlert(String emailAddress) {
     this.emailAddress = emailAddress;
   }
 
   @Override
-  public void init(ProcessorContext context) {
+  public void init(ProcessorContext&lt;Void, Void&gt; context) {
     this.context = context;
 
     // Here you would perform any additional initializations such as setting up an email client.
   }
 
   @Override
-  void process(PageId pageId, Long count) {
+  void process(Record&lt;PageId, Long&gt; record) {
     // Here you would format and send the alert email.
     //
-    // In this specific example, you would be able to include information about the page&#39;s ID and its view count
-    // (because the class implements `Processor&lt;PageId, Long&gt;`).
+    // In this specific example, you would be able to include
+    // information about the page&#39;s ID and its view count

Review comment:
       Simplified the comment a little. It seemed misleading, since the reason you get the page id and view count is the processor's position in the topology, not the input type parameters. But it also doesn't seem like an important point to make at all, so I dropped it.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
+            try (final KeyValueIterator&lt;String, Integer&gt; iter = kvStore.all()) {
+                while (iter.hasNext()) {
+                    final KeyValue&lt;String, Integer&gt; entry = iter.next();
+                    context.forward(new Record&lt;&gt;(entry.key, entry.value.toString(), timestamp));
+                }
+            }
+        });
+        kvStore = context.getStateStore("Counts");
+    }
 
-      // schedule a punctuate() method every second based on stream-time
-      this.context.schedule(Duration.ofSeconds(1000), PunctuationType.STREAM_TIME, (timestamp) -&gt; {
-          KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-          while (iter.hasNext()) {
-              KeyValue&lt;String, Long&gt; entry = iter.next();
-              context.forward(entry.key, entry.value.toString());
-          }
-          iter.close();
+    @Override
+    public void process(final Record&lt;String, String&gt; record) {

Review comment:
       This is kind of funny: the old example was actually missing the `process` method! I copied the implementation over from the WordCountProcessorDemo.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3492,7 +3494,6 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
                     </ul>
                 </div>
                 <p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p>
-                <p>In Java 8+, using lambda expressions:</p>

Review comment:
       We dropped support for Java 7 a while ago, so I reckon we can just document Java 8 now. But I didn't want to go on a crusade through the docs either. I touched this section because it's related to my docs task.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {

Review comment:
       Adding in the new output type bounds. The rest of the example is also updated to the new PAPI (type bounds on ProcessorContext, Record, etc.)

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
##########
@@ -54,47 +52,46 @@
  * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
  */
 public final class WordCountProcessorDemo {
-
-    static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
+    static class WordCountProcessor implements Processor<String, String, String, String> {

Review comment:
       This might be out of scope here, but I visited this class as a reference for my docs update. It seems more natural to explicitly define the processor and then use a lambda for the supplier than to explicitly define the supplier and use a lambda for the processor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668856588



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       Thanks for pointing this out. It's not right to put a hyphen right here because "time" is a noun and "stream" is an adjective in this usage, as opposed to something like "stream-time punctuation", in which case "punctuation" is the noun, and "stream-time" is effectively a compound word acting as a single adjective.
   
   In some cases, it would be fine to say that "stream-time" could be used here as a compound word functioning as a noun, but for some reason, that doesn't sound right to me in this sentence.
   
   That said, compound word usage in English is pretty idiomatic, and I'm not sure anyone could claim there's a solid rule governing this sentence. All I can say is that "steam time" sounds right to me :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668839336



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       Now that you mention it, I see that that's true. Should we actually suggest doing that, though? It seems kind of advanced for this level of documentation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669292416



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       Don't have a strong opinion -- but it's actually quite useful -- for example, you could register a punctuation for a specific input record (that you buffer in a state store) to emit it later on.
   
   Atm it sound like that you can only register a punctuation within `init()` what seems wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669947367



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       Hmm, that actually does sounds really useful. I never thought of it before. I'll file a ticket to document this use case. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #10994:
URL: https://github.com/apache/kafka/pull/10994


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668841230



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>

Review comment:
       Ayayay. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#issuecomment-879184656


   Thanks so much for the reviews, everyone!
   
   I've merged and cherry-picked to 3.0 (cc @kkonstantine )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#issuecomment-875972075


   Here are the rendered docs for the reviewers:
   
   ![dsl-api](https://user-images.githubusercontent.com/832787/124835667-4e0bcb80-df47-11eb-837a-a3f3537223cf.png)
   
   ![processor-api](https://user-images.githubusercontent.com/832787/124835685-5532d980-df47-11eb-8625-89e76fffb099.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668842244



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:

Review comment:
       ```suggestion
               data class. This class gives you access to the main components of a Kafka record:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668848827



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties

Review comment:
       Thanks for this thought. I tend to say "properties" when I want to refer to the components of a complex type in the abstract.
   
   "Field" refers to the actual instance member that stores the pointer or primitive. For example, the fields of Record are private, so we actually don't give access to the fields per se. We do provide methods that get or set those fields (on new or existing instances). So the idea of saying "property" is to refer to the key, value, timestamp, or header itself without specifically mentioning whether it's stored in a field, variable, constant, or accessed/set via constructors, methods, etc.
   
   "Element" would also work, but I think I still favor "property", since people tend to refer to the contents of a data collection as its elements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668857339



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {

Review comment:
       ```suggestion
           context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -&gt; {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668842117



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:

Review comment:
       Good point!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668856588



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       Thanks for pointing this out. I left out the hyphen here because "time" is a noun and "stream" is an adjective in this usage, as opposed to something like "stream-time punctuation", in which case "punctuation" is the noun, and "stream-time" is effectively a compound word acting as a single adjective.
   
   In some cases, it would be fine to say that "stream-time" could be used here as a compound word functioning as a noun, but for some reason, that doesn't sound right to me in this sentence.
   
   That said, compound word usage in English is pretty idiomatic, and I'm not sure anyone could claim there's a solid rule governing this sentence. All I can say is that "steam time" sounds right to me :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668849485



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       I think I'll go ahead and merge to get this off my plate. If you do want to make this point, I can do a follow-on PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668838008



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {

Review comment:
       Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669293544



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       I guess we use `stream-time` as a noun throughout the documentation... But anyway. Not a big deal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org