You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/02 20:50:09 UTC

[GitHub] [kafka] mjsax opened a new pull request #8603: MINOR: Fix ProcessorContext JavaDocs

mjsax opened a new pull request #8603:
URL: https://github.com/apache/kafka/pull/8603


   We changed how "stream time" is computed in `2.3` release. This should be cherry-picked to older branches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#issuecomment-623240662


   Java 8 and 14 passed.
   Java 11: `org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#issuecomment-624970217


   Java 8: `org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableJoin[exactly_once_beta]`
   Java 11: timed out
   Java 14:
   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not available
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input stream partition timestamps.

Review comment:
       Sound like a bug :)
   
   But it seems to be a one line fix that I can piggy-back on this PR. We advance the "partition time" too early. If we advance it when the return the record for processing, all should be fixed? Partition time should not be the head-record timestamp but the past processed record of the partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#issuecomment-623011501


   Call for review @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#issuecomment-624367644


   @vvcephei Updates this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not available
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input stream partition timestamps.

Review comment:
       Sound like a bug :)
   
   But it seems to be a one line fix that I can piggy-back on this PR. We advance the "partition time" too early. If we advance it when the return the record for processing, all should be fixed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420377361



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call)
+     * available (for example, if this method is invoked from the punctuate call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not available
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input stream partition timestamps.

Review comment:
       I just took another look at the definition of streamTime, and it actually looks like it might be computed wrongly.
   
   The way it works is that the "stream time" for a task is computed most of the time in `org.apache.kafka.streams.processor.internals.PartitionGroup#nextRecord`, i.e., it's the max timestamp of any record _polled from the PartitionGroup_.
   
   However, when we commit, we commit the "partition time" for each TopicPartition, which is set when we move a record into the head position for that queue. During restoration, we read these committed timestamps for each TopicPartition, and we (incorrectly) set the "stream time" to be the maximum over the "partition time" of each partition in the PartitionGroup (aka Task).
   
   This is incorrect in two ways:
   1. it should be the minimum, not the maximum (since we would choose the record with the minimum timestamp to process next)
   2. the timestamp of the _head enqueued_ record (partition time) is not the timestamp of the _last dequeued_ record (stream time).
   
   I'll file a Jira ticket capturing all this. In the mean time, I'd suggest that we just update the docs to reflect the correct definition of "stream time": `which is defined as the largest timestamp of any record processed by the task`. Then, we can fix the code to make this true all the time. Currently, it's only true in steady state, not immediately after restoration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org