You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/11 08:04:30 UTC

[GitHub] [flink] PatrickRen commented on a diff in pull request #20370: [FLINK-28185][Connector/Kafka] Make TimestampOffsetsInitializer apply offset reset str…

PatrickRen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r991931482


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java:
##########
@@ -45,31 +52,43 @@ public Map<TopicPartition, Long> getPartitionOffsets(
         Map<TopicPartition, Long> startingTimestamps = new HashMap<>();
         Map<TopicPartition, Long> initialOffsets = new HashMap<>();
 
-        // First get the current end offsets of the partitions. This is going to be used
-        // in case we cannot find a suitable offsets based on the timestamp, i.e. the message
-        // meeting the requirement of the timestamp have not been produced to Kafka yet, in
-        // this case, we just use the latest offset.
-        // We need to get the latest offsets before querying offsets by time to ensure that
-        // no message is going to be missed.
-        Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);
+        final Map<TopicPartition, Long> defaultOffsets;

Review Comment:
    if we always reset to latest for invalid timestamps, this logic could remain unchanged. 



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -125,15 +125,35 @@ static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrate
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
      * initialized offset is the offset of the first record whose record timestamp is greater than
-     * or equals the give timestamp (milliseconds).
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the latest offset.
      *
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
-        return new TimestampOffsetsInitializer(timestamp);
+        return new TimestampOffsetsInitializer(timestamp, OffsetResetStrategy.LATEST);
+    }
+
+    /**
+     * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the
+     * initialized offset is the offset of the first record whose record timestamp is greater than
+     * or equals the give timestamp (milliseconds). If the timestamp does not correspond to any
+     * offset, the initializer will default to the {@link OffsetResetStrategy} to find the offsets.
+     * For example, if {@link OffsetResetStrategy#NONE} is specified, then this implementation will
+     * throw an exception.
+     *
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
+     * @param offsetResetStrategy the offset reset strategy to use when the timestamp does not
+     *     correspond an existing offset.
+     * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
+     *     timestamp.
+     * @see KafkaAdminClient#listOffsets(Map)
+     */
+    static OffsetsInitializer timestamp(long timestamp, OffsetResetStrategy offsetResetStrategy) {

Review Comment:
   I think we are introducing a new public API here as `OffsetsInitializer` is marked as public evolving, which requires discussion in mailing list + vote. 
   
   I'm OK with keeping only the original `OffsetsInitializer.timestamp()` above with default reset strategy to latest for now, as there's always a fallback that users can implement their own `OffsetsInitializer`, and this patch could be applied to old versions. We can introduce this new API with an individual ticket and the standard procedure for APIs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org