You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/17 19:56:25 UTC

[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #8911: add freshness lag when there are no consuming segments

sajjad-moradi commented on code in PR #8911:
URL: https://github.com/apache/pinot/pull/8911#discussion_r922888568


##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java:
##########
@@ -195,19 +198,35 @@ private DataTable processQueryInternal(ServerQueryRequest queryRequest, Executor
 
     // Gather stats for realtime consuming segments
     int numConsumingSegments = 0;
-    long minIndexTimeMs = Long.MAX_VALUE;
-    long minIngestionTimeMs = Long.MAX_VALUE;
-    for (IndexSegment indexSegment : indexSegments) {
-      if (indexSegment instanceof MutableSegment) {
-        numConsumingSegments += 1;
-        SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
-        long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
-        if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
-          minIndexTimeMs = indexTimeMs;
-        }
-        long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
-        if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < minIngestionTimeMs) {
-          minIngestionTimeMs = ingestionTimeMs;
+    int numOnlineSegments = 0;
+    long minIndexTimeMs = 0;
+    long minIngestionTimeMs = 0;
+    long maxEndTimeMs = 0;
+    if (tableDataManager instanceof RealtimeTableDataManager) {
+      numConsumingSegments = 0;
+      numOnlineSegments = 0;
+      minIndexTimeMs = Long.MAX_VALUE;
+      minIngestionTimeMs = Long.MAX_VALUE;
+      maxEndTimeMs = Long.MIN_VALUE;
+      for (IndexSegment indexSegment : indexSegments) {
+        if (indexSegment instanceof MutableSegment) {
+          numConsumingSegments += 1;
+          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+          long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
+          if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
+            minIndexTimeMs = indexTimeMs;
+          }
+          long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
+          if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < minIngestionTimeMs) {
+            minIngestionTimeMs = ingestionTimeMs;
+          }
+        } else if (indexSegment instanceof ImmutableSegment) {
+          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+          Interval timeInterval = segmentMetadata.getTimeInterval();
+          numOnlineSegments++;
+          if (timeInterval != null) {
+            maxEndTimeMs = Math.max(maxEndTimeMs, timeInterval.getEndMillis());

Review Comment:
   I was actually talking about immutable segments, not mutable ones!
   The values for `startTimeString` and `endTimeString` variables are taken from the min and max value of the time column:
   ```java
   SegmentColumnarIndexCreator.writeMetadata() {
   ...
           if (_config.getStartTime() != null) {
             startTime = Long.parseLong(_config.getStartTime());
             endTime = Long.parseLong(_config.getEndTime());
             timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit());
           } else {
             if (_totalDocs > 0) {
   HERE->      String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
   HERE->      String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
   
               if (_config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
                 // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
                 // Use DateTimeFormatter from DateTimeFormatSpec to handle default time zone consistently.
                 DateTimeFormatSpec formatSpec = _config.getDateTimeFormatSpec();
                 Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec must exist for SimpleDate");
                 DateTimeFormatter dateTimeFormatter = formatSpec.getDateTimeFormatter();
                 startTime = dateTimeFormatter.parseMillis(startTimeStr);
                 endTime = dateTimeFormatter.parseMillis(endTimeStr);
                 timeUnit = TimeUnit.MILLISECONDS;
               } else {
                 // by default, time column type is TimeColumnType.EPOCH
                 startTime = Long.parseLong(startTimeStr);
                 endTime = Long.parseLong(endTimeStr);
                 timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit());
               }
             } else {
               // No records in segment. Use current time as start/end
               long now = System.currentTimeMillis();
               if (_config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
                 startTime = now;
                 endTime = now;
                 timeUnit = TimeUnit.MILLISECONDS;
               } else {
                 timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit());
                 startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
                 endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
               }
             }
           }
   
           if (!_config.isSkipTimeValueCheck()) {
             Interval timeInterval =
                 new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC);
             Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
                 "Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s",
                 timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumnName,
                 TimeUtils.VALID_TIME_INTERVAL);
           }
   
   HERE->  properties.setProperty(SEGMENT_START_TIME, startTime);
   HERE->  properties.setProperty(SEGMENT_END_TIME, endTime);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org