You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/11/15 20:06:17 UTC
[pinot] branch master updated: Add config for enabling realtime offset based consumption status checker (#7753)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6611ff8 Add config for enabling realtime offset based consumption status checker (#7753)
6611ff8 is described below
commit 6611ff82f46ea837e6bd8edefd46ccc16d4e2002
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Nov 15 12:05:32 2021 -0800
Add config for enabling realtime offset based consumption status checker (#7753)
---
.../apache/pinot/common/utils/ServiceStatus.java | 21 ++++++++-------------
.../server/starter/helix/BaseServerStarter.java | 16 ++++++++++++----
.../org/apache/pinot/spi/utils/CommonConstants.java | 3 +++
3 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index d50e399..45221fb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -218,8 +218,6 @@ public class ServiceStatus {
private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset;
String _statusDescription = STATUS_DESCRIPTION_INIT;
- private boolean _consumptionNotYetCaughtUp = true;
-
/**
* Realtime consumption catchup service which adds a static wait time for consuming segments to catchup
*/
@@ -242,22 +240,19 @@ public class ServiceStatus {
return _serviceStatus;
}
long now = System.currentTimeMillis();
- int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+ boolean isConsumingSegmentsCounterProvided = _getNumConsumingSegmentsNotReachedTheirLatestOffset != null;
+ int numConsumingSegmentsNotCaughtUp =
+ isConsumingSegmentsCounterProvided ? _getNumConsumingSegmentsNotReachedTheirLatestOffset.get() : -1;
if (now >= _endWaitTime) {
_statusDescription = String.format("Consuming segments status GOOD since %dms "
+ "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp);
return Status.GOOD;
}
- if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
- // TODO: Once the performance of offset based consumption checker is validated:
- // - remove the log line
- // - uncomment the status & statusDescription lines
- // - remove variable _consumptionNotYetCaughtUp
- _consumptionNotYetCaughtUp = false;
- LOGGER.info("All consuming segments have reached their latest offsets! "
- + "Finished {} msec earlier than time threshold.", _endWaitTime - now);
-// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
-// return Status.GOOD;
+ if (isConsumingSegmentsCounterProvided && numConsumingSegmentsNotCaughtUp == 0) {
+ _statusDescription = String.format(
+ "Consuming segments status GOOD as all consuming segments have reached the latest offset. "
+ + "Finished %d msec earlier than time threshold.", _endWaitTime - now);
+ return Status.GOOD;
}
_statusDescription =
String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, "
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fe010ba..aaa1e2a 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
@@ -221,6 +222,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
int realtimeConsumptionCatchupWaitMs = _serverConf
.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+ boolean isOffsetBasedConsumptionStatusCheckerEnabled = _serverConf
+ .getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER,
+ Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER);
// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
@@ -265,12 +269,16 @@ public abstract class BaseServerStarter implements ServiceStartable {
_instanceId, resourcesToMonitor, minResourcePercentForStartup));
boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
- OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
- new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
+ Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = null;
+ if (isOffsetBasedConsumptionStatusCheckerEnabled) {
+ OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+ new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
+ getNumConsumingSegmentsNotReachedTheirLatestOffset =
+ consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset;
+ }
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
- _instanceId, realtimeConsumptionCatchupWaitMs,
- consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
+ _instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedTheirLatestOffset));
}
LOGGER.info("Registering service status handler");
ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1402417..58c175e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -296,6 +296,9 @@ public class CommonConstants {
public static final String CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS =
"pinot.server.starter.realtimeConsumptionCatchupWaitMs";
public static final int DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
+ public static final String CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER =
+ "pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker";
+ public static final boolean DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false;
public static final String DEFAULT_READ_MODE = "mmap";
// Whether to reload consuming segment on scheme update
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org