You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/13 00:03:17 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668331269



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties

Review comment:
       `properties` -> `fields` (or `elements`) ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:

Review comment:
       `key` -> `main` (to avoid confusion with "record key") ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
-                    If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams processes records.
+                  If there are no records to process, or if Streams is waiting for new records
+                  due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       `stream time` -> `stream-time` 

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a Kafka record:
+            the key, value, timestamp and headers. When forwarding records, you can use the
+            constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to replace one of the
+            <code class="docutils literal"><span class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called "punctuation")
+              in your processor's <code class="docutils literal"><span class="pre">init()</span></code>

Review comment:
       You can also create a punctuation within `process()`.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {

Review comment:
       `->` -> `-&gt;` ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
               <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
               <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
-	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
-	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
-	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
+            to <code class="docutils literal"><span class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
+            define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all (or if it only forwards
+            <code class="docutils literal"><span class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common superclass, you will
+            have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>

Review comment:
       `precords` -> `records`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org