You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/05 00:30:32 UTC

[GitHub] [beam] damccorm opened a new issue, #21610: Support for reading Kafka topics from any startReadTime in Java

damccorm opened a new issue, #21610:
URL: https://github.com/apache/beam/issues/21610

   [https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198](https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198)
   
    
   
   Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in every topic partition that is newer or equal to that timestamp. The problem is that if we use a timestamp that is so new, that we don't have any newer/equal message in the partition. In that case the code fails with an exception. Meanwhile in certain cases it makes no sense as we could actually make it work.
   
   If we don't get an offset from calling `consumer.offsetsForTimes`, we should call `endOffsets`, and use the returned offset **** 1. That is actually the offset we will have to read next time.
   
   Even if `endOffsets` can't return an offset we could use 0 as the offset to read from.
   
    
   
   Am I missing something here? Is it okay to contribute this?
   
   Imported from Jira [BEAM-14518](https://issues.apache.org/jira/browse/BEAM-14518). Original Jira may contain additional context.
   Reported by: bnemeth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #21610: Support for reading Kafka topics from any startReadTime in Java

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1180781893

   I'm not sure that simply using endOffsets would work in all cases. It may make sense to instead have the caller of this method try to handle getting a correct offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #21610: Support for reading Kafka topics from any startReadTime in Java

Posted by GitBox <gi...@apache.org>.
nbali commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1208754414

   I'm not sure I will have time in the near future to implement this, but given how slowly the discussion went I created a working solution for reading the whole kafka stream in a batch pipeline. So whoever needs a quicker workaround that is even more customizable:
   ```java
   /**
    * Using {@link KafkaIO.Read#withStopReadTime(org.joda.time.Instant)} will try to acquire an offset for the given timestamp.<br>
    * If there are only older offsets than the provided timestamp the default implementation fails with an exception.<br>
    * This implementation falls back to the newest available offset instead - essentially reading till the newest available message.
    */
   @Slf4j
   public class MyKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
       
       public MyKafkaConsumer(Map<String, Object> configs) {
           super(configs);
       }
       
       @Override
       public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
           final Map<TopicPartition, OffsetAndTimestamp> result = super.offsetsForTimes(timestampsToSearch, timeout);
           
           final List<TopicPartition> topicPartitionsWithoutProperOffset =
                   result.keySet().stream()
                           .filter(topicPartition -> result.get(topicPartition) == null)
                           .collect(Collectors.toList());
           
           endOffsets(topicPartitionsWithoutProperOffset).forEach((topicPartition, endOffset) -> {
               final Long timestampToSearch = timestampsToSearch.get(topicPartition);
               log.warn("Offset for topicPartition: {}, timestamp: {} was not found, replaced by endOffset: {}",
                       topicPartition, timestampToSearch, endOffset);
               result.put(topicPartition, new OffsetAndTimestamp(endOffset, timestampToSearch));
           });
           
           return result;
       }
       
   }
   ```
   
   ```
   KafkaIO.readBytes()
   	.withStopReadTime(Instant.now())
   	.withConsumerFactoryFn(MyKafkaConsumer::new) // required for stopReadTime
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #21610: Support for reading Kafka topics from any startReadTime in Java

Posted by GitBox <gi...@apache.org>.
nbali commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1190838070

   So basically the ability to support a provider that calculates an offset to be used if it's not found, and the default provider being the current implementation that throws an exception?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #21610: Support for reading Kafka topics from any startReadTime in Java

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1205346003

   Essentially. This method is public, so we shouldn't change the general pattern here.
   I would add a new method that calls this, but tries for end offset if it gets the exception, and then use that utility in the relevant places in the kafka readers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org