You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/11/15 20:06:17 UTC

[pinot] branch master updated: Add config for enabling realtime offset based consumption status checker (#7753)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6611ff8  Add config for enabling realtime offset based consumption status checker (#7753)
6611ff8 is described below

commit 6611ff82f46ea837e6bd8edefd46ccc16d4e2002
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Nov 15 12:05:32 2021 -0800

    Add config for enabling realtime offset based consumption status checker (#7753)
---
 .../apache/pinot/common/utils/ServiceStatus.java    | 21 ++++++++-------------
 .../server/starter/helix/BaseServerStarter.java     | 16 ++++++++++++----
 .../org/apache/pinot/spi/utils/CommonConstants.java |  3 +++
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index d50e399..45221fb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -218,8 +218,6 @@ public class ServiceStatus {
     private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset;
     String _statusDescription = STATUS_DESCRIPTION_INIT;
 
-    private boolean _consumptionNotYetCaughtUp = true;
-
     /**
      * Realtime consumption catchup service which adds a static wait time for consuming segments to catchup
      */
@@ -242,22 +240,19 @@ public class ServiceStatus {
         return _serviceStatus;
       }
       long now = System.currentTimeMillis();
-      int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+      boolean isConsumingSegmentsCounterProvided = _getNumConsumingSegmentsNotReachedTheirLatestOffset != null;
+      int numConsumingSegmentsNotCaughtUp =
+          isConsumingSegmentsCounterProvided ? _getNumConsumingSegmentsNotReachedTheirLatestOffset.get() : -1;
       if (now >= _endWaitTime) {
         _statusDescription = String.format("Consuming segments status GOOD since %dms "
             + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp);
         return Status.GOOD;
       }
-      if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
-        // TODO: Once the performance of offset based consumption checker is validated:
-        //      - remove the log line
-        //      - uncomment the status & statusDescription lines
-        //      - remove variable _consumptionNotYetCaughtUp
-        _consumptionNotYetCaughtUp = false;
-        LOGGER.info("All consuming segments have reached their latest offsets! "
-            + "Finished {} msec earlier than time threshold.", _endWaitTime - now);
-//      _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
-//      return Status.GOOD;
+      if (isConsumingSegmentsCounterProvided && numConsumingSegmentsNotCaughtUp == 0) {
+        _statusDescription = String.format(
+            "Consuming segments status GOOD as all consuming segments have reached the latest offset. "
+                + "Finished %d msec earlier than time threshold.", _endWaitTime - now);
+        return Status.GOOD;
       }
       _statusDescription =
           String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, "
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fe010ba..aaa1e2a 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
@@ -221,6 +222,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
     int realtimeConsumptionCatchupWaitMs = _serverConf
         .getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
             Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+    boolean isOffsetBasedConsumptionStatusCheckerEnabled = _serverConf
+        .getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER,
+            Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER);
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
@@ -265,12 +269,16 @@ public abstract class BaseServerStarter implements ServiceStartable {
             _instanceId, resourcesToMonitor, minResourcePercentForStartup));
     boolean foundConsuming = !consumingSegments.isEmpty();
     if (checkRealtime && foundConsuming) {
-      OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
-          new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
+      Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset = null;
+      if (isOffsetBasedConsumptionStatusCheckerEnabled) {
+        OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+            new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
+        getNumConsumingSegmentsNotReachedTheirLatestOffset =
+            consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset;
+      }
       serviceStatusCallbackListBuilder.add(
           new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
-              _instanceId, realtimeConsumptionCatchupWaitMs,
-              consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
+              _instanceId, realtimeConsumptionCatchupWaitMs, getNumConsumingSegmentsNotReachedTheirLatestOffset));
     }
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1402417..58c175e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -296,6 +296,9 @@ public class CommonConstants {
     public static final String CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS =
         "pinot.server.starter.realtimeConsumptionCatchupWaitMs";
     public static final int DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
+    public static final String CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER =
+        "pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker";
+    public static final boolean DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false;
 
     public static final String DEFAULT_READ_MODE = "mmap";
     // Whether to reload consuming segment on scheme update

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org