You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/09/20 18:10:07 UTC

[pinot] branch master updated: Offset based realtime consumption status checker (#7267)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 53cfdf3  Offset based realtime consumption status checker (#7267)
53cfdf3 is described below

commit 53cfdf3c7bb97bd2d6624f2f9be9db195983c32b
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Sep 20 11:09:45 2021 -0700

    Offset based realtime consumption status checker (#7267)
    
    * Add offset based realtime consumption status checker
    
    * Applied PR suggestions
    
    * One log line when consumption catches up
    
    * Return numConsumingSegmentsNotCaughtUp
    
    * Also add num of outstanding segments to timeout scenario
---
 .../apache/pinot/common/utils/ServiceStatus.java   |  44 +++-
 .../realtime/LLRealtimeSegmentDataManager.java     |  20 +-
 .../server/starter/helix/BaseServerStarter.java    |  15 +-
 .../helix/OffsetBasedConsumptionStatusChecker.java | 114 ++++++++++
 .../OffsetBasedConsumptionStatusCheckerTest.java   | 251 +++++++++++++++++++++
 .../apache/pinot/spi/stream/OffsetCriteria.java    |   3 +
 6 files changed, 427 insertions(+), 20 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 06ee990..d50e399 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -204,28 +205,33 @@ public class ServiceStatus {
 
   /**
    * Service status callback that checks whether realtime consumption has caught up
-   * TODO: In this initial version, we are simply adding a configurable static wait time
-   * This can be made smarter:
-   * 1) Keep track of average consumption rate for table in server stats
-   * 2) Monitor consumption rate during startup, report GOOD when it stabilizes to average rate
-   * 3) Monitor consumption rate during startup, report GOOD if it is idle
+   * An offset based consumption status checker is being added in two phases. First phase adds the new status checker,
+   * but it doesn't apply its output. Instead it only logs its behavior. When the behavior is analysed and approved
+   * for different tables with different consumption rates, we can safely use the new status checker.
+   * (Another approach would be to define a new config and disable it by default. Since this feature is not urgent,
+   * we decided to not define yet another config and go with this two phase approach)
    */
   public static class RealtimeConsumptionCatchupServiceStatusCallback implements ServiceStatusCallback {
 
     private final long _endWaitTime;
     private final Status _serviceStatus = Status.STARTING;
+    private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset;
     String _statusDescription = STATUS_DESCRIPTION_INIT;
 
+    private boolean _consumptionNotYetCaughtUp = true;
+
     /**
      * Realtime consumption catchup service which adds a static wait time for consuming segments to catchup
      */
     public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager helixManager, String clusterName,
-        String instanceName, long realtimeConsumptionCatchupWaitMs) {
+        String instanceName, long realtimeConsumptionCatchupWaitMs,
+        Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset) {
 
       // A consuming segment will actually be ready to serve queries after (time of creation of partition consumer) +
       // (configured max time to catchup)
       // We are approximating it to (time of server startup) + (configured max time to catch up)
       _endWaitTime = System.currentTimeMillis() + realtimeConsumptionCatchupWaitMs;
+      _getNumConsumingSegmentsNotReachedTheirLatestOffset = getNumConsumingSegmentsNotReachedTheirLatestOffset;
       LOGGER.info("Monitoring realtime consumption catchup. Will allow {} ms before marking status GOOD",
           realtimeConsumptionCatchupWaitMs);
     }
@@ -236,13 +242,27 @@ public class ServiceStatus {
         return _serviceStatus;
       }
       long now = System.currentTimeMillis();
-      if (now < _endWaitTime) {
-        _statusDescription =
-            String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now);
-        return Status.STARTING;
+      int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+      if (now >= _endWaitTime) {
+        _statusDescription = String.format("Consuming segments status GOOD since %dms "
+            + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp);
+        return Status.GOOD;
       }
-      _statusDescription = String.format("Consuming segments status GOOD since %dms", _endWaitTime);
-      return Status.GOOD;
+      if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
+        // TODO: Once the performance of offset based consumption checker is validated:
+        //      - remove the log line
+        //      - uncomment the status & statusDescription lines
+        //      - remove variable _consumptionNotYetCaughtUp
+        _consumptionNotYetCaughtUp = false;
+        LOGGER.info("All consuming segments have reached their latest offsets! "
+            + "Finished {} msec earlier than time threshold.", _endWaitTime - now);
+//      _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
+//      return Status.GOOD;
+      }
+      _statusDescription =
+          String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, "
+              + "timeRemaining=%dms", numConsumingSegmentsNotCaughtUp, _endWaitTime - now);
+      return Status.STARTING;
     }
 
     @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 72aaf15..a36935d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -72,6 +72,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
 import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -287,6 +288,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final boolean _nullHandlingEnabled;
   private final SegmentCommitterFactory _segmentCommitterFactory;
 
+  private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = null;
+
   // TODO each time this method is called, we print reason for stop. Good to print only once.
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
@@ -763,11 +766,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return _lastLogTime;
   }
 
-  @VisibleForTesting
-  protected StreamPartitionMsgOffset getCurrentOffset() {
+  public StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
   }
 
+  public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
+    return _latestStreamOffsetAtStartupTime;
+  }
+
   @VisibleForTesting
   protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
     return _segmentBuildDescriptor;
@@ -1364,6 +1370,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     _state = State.INITIAL_CONSUMING;
 
+    // fetch latest stream offset
+    try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
+        .createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
+      _latestStreamOffsetAtStartupTime = metadataProvider
+          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
+    } catch (Exception e) {
+      _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
+          _partitionGroupId);
+    }
+
     long now = now();
     _consumeStartTime = now;
     long maxConsumeTimeMillis = _partitionLevelStreamConfig.getFlushThresholdTimeMillis();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index e58d020..9f964d5 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -215,8 +215,8 @@ public abstract class BaseServerStarter implements ServiceStartable {
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
-    // if even 1 resource has this instance in ideal state with state CONSUMING, set this to true
-    boolean foundConsuming = false;
+
+    Set<String> consumingSegments = new HashSet<>();
     boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
 
     for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) {
@@ -235,12 +235,11 @@ public abstract class BaseServerStarter implements ServiceStartable {
             break;
           }
         }
-        if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) {
+        if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) {
           for (String partitionName : idealState.getPartitionSet()) {
             if (StateModel.SegmentStateModel.CONSUMING
                 .equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
-              foundConsuming = true;
-              break;
+              consumingSegments.add(partitionName);
             }
           }
         }
@@ -255,10 +254,14 @@ public abstract class BaseServerStarter implements ServiceStartable {
     serviceStatusCallbackListBuilder.add(
         new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _helixClusterName,
             _instanceId, resourcesToMonitor, minResourcePercentForStartup));
+    boolean foundConsuming = !consumingSegments.isEmpty();
     if (checkRealtime && foundConsuming) {
+      OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+          new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
       serviceStatusCallbackListBuilder.add(
           new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
-              _instanceId, realtimeConsumptionCatchupWaitMs));
+              _instanceId, realtimeConsumptionCatchupWaitMs,
+              consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
     }
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
new file mode 100644
index 0000000..e67eacd
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is used at startup time to have a more accurate estimate of the catchup period in which no query execution
+ * happens and consumers try to catch up to the latest messages available in streams.
+ * To achieve this, every time status check is called - {@link #getNumConsumingSegmentsNotReachedTheirLatestOffset} -
+ * for each consuming segment, we check if segment's latest ingested offset has reached the latest stream offset that's
+ * fetched once at startup time.
+ */
+public class OffsetBasedConsumptionStatusChecker {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OffsetBasedConsumptionStatusChecker.class);
+
+  // constructor parameters
+  private final InstanceDataManager _instanceDataManager;
+  private final Set<String> _consumingSegments;
+
+  // helper variable
+  private final Set<String> _caughtUpSegments = new HashSet<>();
+
+  public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) {
+    _instanceDataManager = instanceDataManager;
+    _consumingSegments = consumingSegments;
+  }
+
+  public int getNumConsumingSegmentsNotReachedTheirLatestOffset() {
+    for (String segName : _consumingSegments) {
+      if (_caughtUpSegments.contains(segName)) {
+        continue;
+      }
+      TableDataManager tableDataManager = getTableDataManager(segName);
+      if (tableDataManager == null) {
+        LOGGER.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName);
+        continue;
+      }
+      SegmentDataManager segmentDataManager = null;
+      try {
+        segmentDataManager = tableDataManager.acquireSegment(segName);
+        if (segmentDataManager == null) {
+          LOGGER
+              .info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", segName);
+          continue;
+        }
+        if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
+          // There's a possibility that a consuming segment has converted to a committed segment. If that's the case,
+          // segment data manager will not be of type LLRealtime.
+          LOGGER.info("Segment {} is already committed and is considered caught up.", segName);
+          _caughtUpSegments.add(segName);
+          continue;
+        }
+        LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
+        StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
+        StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
+        if (latestStreamOffset == null || latestIngestedOffset == null) {
+          LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. "
+              + "Will check consumption status later", segName, latestStreamOffset, latestIngestedOffset);
+          continue;
+        }
+        if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
+          LOGGER.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ",
+              latestIngestedOffset, segName, latestStreamOffset);
+          continue;
+        }
+        LOGGER.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", segName,
+            latestIngestedOffset, latestStreamOffset);
+        _caughtUpSegments.add(segName);
+      } finally {
+        if (segmentDataManager != null) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    }
+    return _consumingSegments.size() - _caughtUpSegments.size();
+  }
+
+  private TableDataManager getTableDataManager(String segmentName) {
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    String tableName = llcSegmentName.getTableName();
+    String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    return _instanceDataManager.getTableDataManager(tableNameWithType);
+  }
+}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
new file mode 100644
index 0000000..be03eef
--- /dev/null
+++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class OffsetBasedConsumptionStatusCheckerTest {
+
+  @Test
+  public void regularCase() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                       15
+    // segA1              200                      150
+    // segB0              2000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+  }
+
+  @Test
+  public void dataMangersBeingSetup() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+    // TableDataManager is not set up yet
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup some SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+
+    //           latest ingested offset    latest stream offset
+    // segA0               10                     15
+    // segA1               100                    150
+    // segB0           not setup yet              1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+    // setup the remaining SegmentDataManager
+    LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset     latest stream offset
+    // segA0               20                      15
+    // segA1               200                     150
+    // segB0               1000                    1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+
+    //           latest ingested offset     latest stream offset
+    // segA0               30                      15
+    // segA1               300                     150
+    // segB0               2000                    1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+  }
+
+  @Test
+  public void segmentsBeingCommitted() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+    // segB0 is now committed; ImmutableSegmentDataManager is returned by table data manager
+    ImmutableSegmentDataManager immSegMngrB0 = mock(ImmutableSegmentDataManager.class);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                        15
+    // segA1              200                       150
+    // segB0        committed at 1200               1500
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+  }
+
+  @Test
+  public void cannotGetLatestStreamOffset() {
+
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    OffsetBasedConsumptionStatusChecker statusChecker =
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+    // setup SegmentDataManagers
+    LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+    LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+    when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    //           latest ingested offset    latest stream offset
+    // segA0              10                       15
+    // segA1              100                      150
+    // segB0              1000                     null - could not get the latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+    when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+    when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+    when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(null);
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              20                        15
+    // segA1              200                       150
+    // segB0              2000                      null - could not get the latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+
+    //           latest ingested offset     latest stream offset
+    // segA0              30                        15
+    // segA1              300                       150
+    // segB0              3000                      null - could not get the latest offset from stream at startup
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+    when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+    when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(3000));
+    assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+  }
+}
\ No newline at end of file
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
index e0d7166..14cb6a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
@@ -31,6 +31,9 @@ public class OffsetCriteria {
   public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA =
       new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest();
 
+  public static final OffsetCriteria LARGEST_OFFSET_CRITERIA =
+      new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest();
+
   /**
    * Enumerates the supported offset types
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org