You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/01/24 10:17:19 UTC

[GitHub] [pinot] richardstartin commented on a change in pull request #8057: #7250 Kafka consumption from a specific timestamp

richardstartin commented on a change in pull request #8057:
URL: https://github.com/apache/pinot/pull/8057#discussion_r790593249



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
##########
@@ -63,6 +69,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
         offset =
             _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
                 .get(_topicPartition);
+      } else if (offsetCriteria.isPeriod()) {
+        String offsetString = offsetCriteria.getOffsetString();
+        Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
+        TopicPartition tp = new TopicPartition(_topic, _partition);
+        _offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis);

Review comment:
       Please use an injected `java.time.Clock` so the time can be controlled for testing purposes. Injecting `Clock.systemClock()` and calling `Clock.millis()` is equivalent to this logic, but allows you to inject a fixed clock for testing purposes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org