You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/16 20:10:35 UTC

[kafka] branch trunk updated: KAFKA-6905: Document that Processors may be re-used by Streams (#5022)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e9154b7  KAFKA-6905: Document that Processors may be re-used by Streams (#5022)
e9154b7 is described below

commit e9154b7960fd9fe9bf05811e4e9972698eeec355
Author: David Glasser <gl...@meteor.com>
AuthorDate: Wed May 16 13:10:21 2018 -0700

    KAFKA-6905: Document that Processors may be re-used by Streams (#5022)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 docs/streams/developer-guide/processor-api.html                     | 6 +++++-
 .../src/main/java/org/apache/kafka/streams/processor/Processor.java | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index e3432b7..ef89372 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -79,7 +79,11 @@
                 instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
                 its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
                 function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
-                and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).</p>
+              and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).
+              Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the
+              <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
+              <code class="docutils literal"><span class="pre">Processor</span></code> object by calling
+              <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
             <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
 	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
 	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index bcdb2f0..e35337f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -29,7 +29,8 @@ public interface Processor<K, V> {
 
     /**
      * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
-     * that contains it is initialized.
+     * that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the
+     * framework may later re-use the processor by calling {@link #init()} again.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
      * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
@@ -49,7 +50,8 @@ public interface Processor<K, V> {
 
     /**
      * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
-     * Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
+     * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may
+     * later re-use this processor by calling {@link #init()} on it again.
      * <p>
      * Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library.
      */

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.