You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/09/20 18:10:07 UTC
[pinot] branch master updated: Offset based realtime consumption
status checker (#7267)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 53cfdf3 Offset based realtime consumption status checker (#7267)
53cfdf3 is described below
commit 53cfdf3c7bb97bd2d6624f2f9be9db195983c32b
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Sep 20 11:09:45 2021 -0700
Offset based realtime consumption status checker (#7267)
* Add offset based realtime consumption status checker
* Applied PR suggestions
* One log line when consumption catches up
* Return numConsumingSegmentsNotCaughtUp
* Also add num of outstanding segments to timeout scenario
---
.../apache/pinot/common/utils/ServiceStatus.java | 44 +++-
.../realtime/LLRealtimeSegmentDataManager.java | 20 +-
.../server/starter/helix/BaseServerStarter.java | 15 +-
.../helix/OffsetBasedConsumptionStatusChecker.java | 114 ++++++++++
.../OffsetBasedConsumptionStatusCheckerTest.java | 251 +++++++++++++++++++++
.../apache/pinot/spi/stream/OffsetCriteria.java | 3 +
6 files changed, 427 insertions(+), 20 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 06ee990..d50e399 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -204,28 +205,33 @@ public class ServiceStatus {
/**
* Service status callback that checks whether realtime consumption has caught up
- * TODO: In this initial version, we are simply adding a configurable static wait time
- * This can be made smarter:
- * 1) Keep track of average consumption rate for table in server stats
- * 2) Monitor consumption rate during startup, report GOOD when it stabilizes to average rate
- * 3) Monitor consumption rate during startup, report GOOD if it is idle
+ * An offset based consumption status checker is being added in two phases. First phase adds the new status checker,
+ * but it doesn't apply its output. Instead it only logs its behavior. When the behavior is analysed and approved
+ * for different tables with different consumption rates, we can safely use the new status checker.
+ * (Another approach would be to define a new config and disable it by default. Since this feature is not urgent,
+ * we decided to not define yet another config and go with this two phase approach)
*/
public static class RealtimeConsumptionCatchupServiceStatusCallback implements ServiceStatusCallback {
private final long _endWaitTime;
private final Status _serviceStatus = Status.STARTING;
+ private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset;
String _statusDescription = STATUS_DESCRIPTION_INIT;
+ private boolean _consumptionNotYetCaughtUp = true;
+
/**
* Realtime consumption catchup service which adds a static wait time for consuming segments to catchup
*/
public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager helixManager, String clusterName,
- String instanceName, long realtimeConsumptionCatchupWaitMs) {
+ String instanceName, long realtimeConsumptionCatchupWaitMs,
+ Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset) {
// A consuming segment will actually be ready to serve queries after (time of creation of partition consumer) +
// (configured max time to catchup)
// We are approximating it to (time of server startup) + (configured max time to catch up)
_endWaitTime = System.currentTimeMillis() + realtimeConsumptionCatchupWaitMs;
+ _getNumConsumingSegmentsNotReachedTheirLatestOffset = getNumConsumingSegmentsNotReachedTheirLatestOffset;
LOGGER.info("Monitoring realtime consumption catchup. Will allow {} ms before marking status GOOD",
realtimeConsumptionCatchupWaitMs);
}
@@ -236,13 +242,27 @@ public class ServiceStatus {
return _serviceStatus;
}
long now = System.currentTimeMillis();
- if (now < _endWaitTime) {
- _statusDescription =
- String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now);
- return Status.STARTING;
+ int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+ if (now >= _endWaitTime) {
+ _statusDescription = String.format("Consuming segments status GOOD since %dms "
+ + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp);
+ return Status.GOOD;
}
- _statusDescription = String.format("Consuming segments status GOOD since %dms", _endWaitTime);
- return Status.GOOD;
+ if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
+ // TODO: Once the performance of offset based consumption checker is validated:
+ // - remove the log line
+ // - uncomment the status & statusDescription lines
+ // - remove variable _consumptionNotYetCaughtUp
+ _consumptionNotYetCaughtUp = false;
+ LOGGER.info("All consuming segments have reached their latest offsets! "
+ + "Finished {} msec earlier than time threshold.", _endWaitTime - now);
+// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
+// return Status.GOOD;
+ }
+ _statusDescription =
+ String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, "
+ + "timeRemaining=%dms", numConsumingSegmentsNotCaughtUp, _endWaitTime - now);
+ return Status.STARTING;
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 72aaf15..a36935d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -72,6 +72,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -287,6 +288,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final boolean _nullHandlingEnabled;
private final SegmentCommitterFactory _segmentCommitterFactory;
+ private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = null;
+
// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
@@ -763,11 +766,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
return _lastLogTime;
}
- @VisibleForTesting
- protected StreamPartitionMsgOffset getCurrentOffset() {
+ public StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
}
+ public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
+ return _latestStreamOffsetAtStartupTime;
+ }
+
@VisibleForTesting
protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
return _segmentBuildDescriptor;
@@ -1364,6 +1370,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
_state = State.INITIAL_CONSUMING;
+ // fetch latest stream offset
+ try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
+ .createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
+ _latestStreamOffsetAtStartupTime = metadataProvider
+ .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
+ } catch (Exception e) {
+ _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
+ _partitionGroupId);
+ }
+
long now = now();
_consumeStartTime = now;
long maxConsumeTimeMillis = _partitionLevelStreamConfig.getFlushThresholdTimeMillis();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index e58d020..9f964d5 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -215,8 +215,8 @@ public abstract class BaseServerStarter implements ServiceStartable {
// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
- // if even 1 resource has this instance in ideal state with state CONSUMING, set this to true
- boolean foundConsuming = false;
+
+ Set<String> consumingSegments = new HashSet<>();
boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) {
@@ -235,12 +235,11 @@ public abstract class BaseServerStarter implements ServiceStartable {
break;
}
}
- if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) {
+ if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) {
for (String partitionName : idealState.getPartitionSet()) {
if (StateModel.SegmentStateModel.CONSUMING
.equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
- foundConsuming = true;
- break;
+ consumingSegments.add(partitionName);
}
}
}
@@ -255,10 +254,14 @@ public abstract class BaseServerStarter implements ServiceStartable {
serviceStatusCallbackListBuilder.add(
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, resourcesToMonitor, minResourcePercentForStartup));
+ boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
+ OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+ new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
- _instanceId, realtimeConsumptionCatchupWaitMs));
+ _instanceId, realtimeConsumptionCatchupWaitMs,
+ consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
}
LOGGER.info("Registering service status handler");
ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
new file mode 100644
index 0000000..e67eacd
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is used at startup time to have a more accurate estimate of the catchup period in which no query execution
+ * happens and consumers try to catch up to the latest messages available in streams.
+ * To achieve this, every time status check is called - {@link #getNumConsumingSegmentsNotReachedTheirLatestOffset} -
+ * for each consuming segment, we check if segment's latest ingested offset has reached the latest stream offset that's
+ * fetched once at startup time.
+ */
+public class OffsetBasedConsumptionStatusChecker {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OffsetBasedConsumptionStatusChecker.class);
+
+ // constructor parameters
+ private final InstanceDataManager _instanceDataManager;
+ private final Set<String> _consumingSegments;
+
+ // helper variable
+ private final Set<String> _caughtUpSegments = new HashSet<>();
+
+ public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) {
+ _instanceDataManager = instanceDataManager;
+ _consumingSegments = consumingSegments;
+ }
+
+ public int getNumConsumingSegmentsNotReachedTheirLatestOffset() {
+ for (String segName : _consumingSegments) {
+ if (_caughtUpSegments.contains(segName)) {
+ continue;
+ }
+ TableDataManager tableDataManager = getTableDataManager(segName);
+ if (tableDataManager == null) {
+ LOGGER.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName);
+ continue;
+ }
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ LOGGER
+ .info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", segName);
+ continue;
+ }
+ if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to a committed segment. If that's the case,
+ // segment data manager will not be of type LLRealtime.
+ LOGGER.info("Segment {} is already committed and is considered caught up.", segName);
+ _caughtUpSegments.add(segName);
+ continue;
+ }
+ LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
+ StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
+ StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
+ if (latestStreamOffset == null || latestIngestedOffset == null) {
+ LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. "
+ + "Will check consumption status later", segName, latestStreamOffset, latestIngestedOffset);
+ continue;
+ }
+ if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
+ LOGGER.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ",
+ latestIngestedOffset, segName, latestStreamOffset);
+ continue;
+ }
+ LOGGER.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", segName,
+ latestIngestedOffset, latestStreamOffset);
+ _caughtUpSegments.add(segName);
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ }
+ return _consumingSegments.size() - _caughtUpSegments.size();
+ }
+
+ private TableDataManager getTableDataManager(String segmentName) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ String tableName = llcSegmentName.getTableName();
+ String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ return _instanceDataManager.getTableDataManager(tableNameWithType);
+ }
+}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
new file mode 100644
index 0000000..be03eef
--- /dev/null
+++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class OffsetBasedConsumptionStatusCheckerTest {
+
+ @Test
+ public void regularCase() {
+
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ OffsetBasedConsumptionStatusChecker statusChecker =
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+ when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+ when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+ when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+ when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+
+ // latest ingested offset latest stream offset
+ // segA0 10 15
+ // segA1 100 150
+ // segB0 1000 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+ // latest ingested offset latest stream offset
+ // segA0 20 15
+ // segA1 200 150
+ // segB0 2000 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+ }
+
+ @Test
+ public void dataMangersBeingSetup() {
+
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+
+ OffsetBasedConsumptionStatusChecker statusChecker =
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+ // TableDataManager is not set up yet
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+ when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup some SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+
+ // latest ingested offset latest stream offset
+ // segA0 10 15
+ // segA1 100 150
+ // segB0 not setup yet 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+ when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+ when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+ // setup the remaining SegmentDataManager
+ LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ // latest ingested offset latest stream offset
+ // segA0 20 15
+ // segA1 200 150
+ // segB0 1000 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+ when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+
+ // latest ingested offset latest stream offset
+ // segA0 30 15
+ // segA1 300 150
+ // segB0 2000 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+ }
+
+ @Test
+ public void segmentsBeingCommitted() {
+
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ OffsetBasedConsumptionStatusChecker statusChecker =
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+ when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ // latest ingested offset latest stream offset
+ // segA0 10 15
+ // segA1 100 150
+ // segB0 1000 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+ when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+ when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+ when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+ // segB0 is now committed; ImmutableSegmentDataManager is returned by table data manager
+ ImmutableSegmentDataManager immSegMngrB0 = mock(ImmutableSegmentDataManager.class);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0);
+
+ // latest ingested offset latest stream offset
+ // segA0 20 15
+ // segA1 200 150
+ // segB0 committed at 1200 1500
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0);
+ }
+
+ @Test
+ public void cannotGetLatestStreamOffset() {
+
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ OffsetBasedConsumptionStatusChecker statusChecker =
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+ when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+ when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ // latest ingested offset latest stream offset
+ // segA0 10 15
+ // segA1 100 150
+ // segB0 1000 null - could not get the latest offset from stream at startup
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
+ when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15));
+ when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150));
+ when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(null);
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3);
+
+ // latest ingested offset latest stream offset
+ // segA0 20 15
+ // segA1 200 150
+ // segB0 2000 null - could not get the latest offset from stream at startup
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+
+ // latest ingested offset latest stream offset
+ // segA0 30 15
+ // segA1 300 150
+ // segB0 3000 null - could not get the latest offset from stream at startup
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(3000));
+ assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
+ }
+}
\ No newline at end of file
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
index e0d7166..14cb6a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java
@@ -31,6 +31,9 @@ public class OffsetCriteria {
public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA =
new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest();
+ public static final OffsetCriteria LARGEST_OFFSET_CRITERIA =
+ new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest();
+
/**
* Enumerates the supported offset types
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org