You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 23:18:24 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

mjsax commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not available
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input stream partition timestamps.

Review comment:
       Sound like a bug :)
   
   But it seems to be a one line fix that I can piggy-back on this PR. We advance the "partition time" too early. If we advance it when the return the record for processing, all should be fixed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org