You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 00:01:10 UTC

[GitHub] [flink] mas-chen opened a new pull request, #20370: [FLINK-28185] Make TimestampOffsetsInitializer apply offset reset str…

mas-chen opened a new pull request, #20370:
URL: https://github.com/apache/flink/pull/20370

   …ategy and handle timestamps that do not map to an offset
   
   ## What is the purpose of the change
   
   This change improves the TimestampOffsetsInitializer to be initialized with a configured offset reset strategy. The default behavior (LATEST) is preserved. This also fixes a bug for when the timestamp does not correspond to an offset in Kafka and clarifies the exception message that is thrown.
   
   ## Brief change log
   
   - Handles EARLIEST/LATEST/NONE
   - For timestamps that do not correspond to an offset in Kafka and if configured with NONE, the initializer will throw an explicit exception.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added unit test to test the various offset reset strategies and the edge case where timestamp does not correspond to an offset in Kafka
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] LinMingQiang commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
LinMingQiang commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1219465061

   I also encountered this exception. I read Kafka data in batches.Therefore, we want to use latest when the timestamp is exceeded.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1285366698

   What's the status of this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #20370: [FLINK-28185][Connector/Kafka] handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1290287200

   @MartijnVisser @PatrickRen sorry for the delay, feedback should be addressed. Will followup with the public API change separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser merged pull request #20370: [FLINK-28185][Connector/Kafka] handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer

Posted by GitBox <gi...@apache.org>.
MartijnVisser merged PR #20370:
URL: https://github.com/apache/flink/pull/20370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r991931482


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java:
##########
@@ -45,31 +52,43 @@ public Map<TopicPartition, Long> getPartitionOffsets(
         Map<TopicPartition, Long> startingTimestamps = new HashMap<>();
         Map<TopicPartition, Long> initialOffsets = new HashMap<>();
 
-        // First get the current end offsets of the partitions. This is going to be used
-        // in case we cannot find a suitable offsets based on the timestamp, i.e. the message
-        // meeting the requirement of the timestamp have not been produced to Kafka yet, in
-        // this case, we just use the latest offset.
-        // We need to get the latest offsets before querying offsets by time to ensure that
-        // no message is going to be missed.
-        Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);
+        final Map<TopicPartition, Long> defaultOffsets;

Review Comment:
    if we always reset to latest for invalid timestamps, this logic could remain unchanged. 



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -125,15 +125,35 @@ static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrate
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
      * initialized offset is the offset of the first record whose record timestamp is greater than
-     * or equals the give timestamp (milliseconds).
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the latest offset.
      *
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
-        return new TimestampOffsetsInitializer(timestamp);
+        return new TimestampOffsetsInitializer(timestamp, OffsetResetStrategy.LATEST);
+    }
+
+    /**
+     * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
+     * initialized offset is the offset of the first record whose record timestamp is greater than
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the {@link OffsetResetStrategy} to find the offsets.
+     * For example, if {@link OffsetResetStrategy#NONE} is specified, then this implementation will
+     * throw an exception.
+     *
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
+     * @param offsetResetStrategy the offset reset strategy to use when the timestamp does not
+     *     correspond an existing offset.
+     * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
+     *     timestamp.
+     * @see KafkaAdminClient#listOffsets(Map)
+     */
+    static OffsetsInitializer timestamp(long timestamp, OffsetResetStrategy offsetResetStrategy) {

Review Comment:
   I think we are introducing a new public API here as `OffsetsInitializer` is marked as public evolving, which requires discussion in mailing list + vote. 
   
   I'm OK with keeping only the original `OffsetsInitializer.timestamp()` above with default reset strategy to latest for now, as there's always a fallback that users can implement their own `OffsetsInitializer`, and this patch could be applied to old versions. We can introduce this new API with an individual ticket and the standard procedure for APIs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r994179681


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -125,15 +125,35 @@ static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrate
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
      * initialized offset is the offset of the first record whose record timestamp is greater than
-     * or equals the give timestamp (milliseconds).
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the latest offset.
      *
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
-        return new TimestampOffsetsInitializer(timestamp);
+        return new TimestampOffsetsInitializer(timestamp, OffsetResetStrategy.LATEST);
+    }
+
+    /**
+     * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
+     * initialized offset is the offset of the first record whose record timestamp is greater than
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the {@link OffsetResetStrategy} to find the offsets.
+     * For example, if {@link OffsetResetStrategy#NONE} is specified, then this implementation will
+     * throw an exception.
+     *
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
+     * @param offsetResetStrategy the offset reset strategy to use when the timestamp does not
+     *     correspond an existing offset.
+     * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
+     *     timestamp.
+     * @see KafkaAdminClient#listOffsets(Map)
+     */
+    static OffsetsInitializer timestamp(long timestamp, OffsetResetStrategy offsetResetStrategy) {

Review Comment:
   Thanks for the review, I agree with your approach. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #20370: [FLINK-28185][Connector/Kafka] handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1326818969

   @PatrickRen Thanks! Regarding your earlier comment about the Public API change originally in this PR, I have filed https://issues.apache.org/jira/browse/FLINK-30200 and will followup later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1196114714

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c584408092f780ceb187b5d90cb999a1d699b2ae",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c584408092f780ceb187b5d90cb999a1d699b2ae",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c584408092f780ceb187b5d90cb999a1d699b2ae UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20370: [FLINK-28185][Connector/Kafka] handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r1031176628


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -612,6 +612,10 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
                                                             OffsetSpec.forTimestamp(
                                                                     entry.getValue()))))
                     .entrySet().stream()
+                    // OffsetAndTimestamp cannot be initialized with a negative offset, which is
+                    // possible if the timestamp does not correspond to an offset and the topic
+                    // partition is empty
+                    .filter(entry -> entry.getValue().offset() != -1)

Review Comment:
   nit: What about `entry.getValue().offset() >= 0`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -107,7 +111,19 @@ public void testTimestampOffsetsInitializer() {
                     long expectedOffset = tp.partition() > 2 ? tp.partition() : 3L;
                     assertThat((long) offset).isEqualTo(expectedOffset);
                 });
-        assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.NONE);
+        assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
+    }
+
+    @Test
+    public void testTimestampOffsetsInitializerForEmptyPartitions() {
+        OffsetsInitializer initializer = OffsetsInitializer.timestamp(2001);
+        List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC3);
+        Map<TopicPartition, Long> expectedOffsets =
+                partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
+        assertThat(initializer.getPartitionOffsets(partitions, retriever))
+                .as("offsets are equal equal to 0 since the timestamp is out of range.")

Review Comment:
   typo: "are equal ~~equal~~ to"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #20370: [FLINK-28185][Connector/Kafka] handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1326012167

   @PatrickRen have a chance to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1261873888

   @mas-chen There are a couple of holidays coming up, but I've asked @PatrickRen for a review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #20370:
URL: https://github.com/apache/flink/pull/20370#issuecomment-1202030682

   @PatrickRen can you help review? I can't request reviewers unfortunately :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org