You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 20:13:33 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

vvcephei commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420377361



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not available
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input stream partition timestamps.

Review comment:
       I just took another look at the definition of streamTime, and it actually looks like it might be computed wrongly.
   
   The way it works is that the "stream time" for a task is computed most of the time in `org.apache.kafka.streams.processor.internals.PartitionGroup#nextRecord`, i.e., it's the max timestamp of any record _polled from the PartitionGroup_.
   
   However, when we commit, we commit the "partition time" for each TopicPartition, which is set when we move a record into the head position for that queue. During restoration, we read these committed timestamps for each TopicPartition, and we (incorrectly) set the "stream time" to be the maximum over the "partition time" of each partition in the PartitionGroup (aka Task).
   
   This is incorrect in two ways:
   1. it should be the minimum, not the maximum (since we would choose the record with the minimum timestamp to process next)
   2. the timestamp of the _head enqueued_ record (partition time) is not the timestamp of the _last dequeued_ record (stream time).
   
   I'll file a Jira ticket capturing all this. In the mean time, I'd suggest that we just update the docs to reflect the correct definition of "stream time": `which is defined as the largest timestamp of any record processed by the task`. Then, we can fix the code to make this true all the time. Currently, it's only true in steady state, not immediately after restoration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org