You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/19 00:11:56 UTC

[incubator-pinot] 01/01: Add startup/shutdown checks for HelixServerStarter

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch server_restart
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 630245ac5effc08ba0f8669dd2de034d70634cb5
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Sat May 18 17:05:44 2019 -0700

    Add startup/shutdown checks for HelixServerStarter
    
    Optional startup checks:
      - Service status check (ON by default)
    Optional shutdown checks:
      - Query check (drains and finishes existing queries, ON by default)
      - Resource check (wait for all resources OFFLINE, OFF by default)
    
    BACKWARD-INCOMPATIBLE:
    The following config keys are removed or replaced, but the default
    behavior should be desired:
      - pinot.server.starter.enableSegmentsLoadingCheck
      - pinot.server.starter.timeoutInSeconds
      - pinot.server.instance.enable.shutdown.delay
      - pinot.server.instance.starter.maxShutdownWaitTime
      - pinot.server.instance.starter.checkIntervalTime
---
 .../apache/pinot/common/utils/CommonConstants.java |  44 +-
 .../pinot/integration/tests/ClusterTest.java       |   7 +-
 .../server/starter/helix/HelixServerStarter.java   | 443 +++++++++------------
 3 files changed, 230 insertions(+), 264 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7542cc5..ca6a80f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -171,13 +171,8 @@ public class CommonConstants {
     public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class";
     public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port";
     public static final String CONFIG_OF_ADMIN_API_PORT = "pinot.server.adminapi.port";
-    public static final String CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK =
-        "pinot.server.starter.enableSegmentsLoadingCheck";
-    public static final String CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS = "pinot.server.starter.timeoutInSeconds";
 
     public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version";
-    public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns";
-    public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
     public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit";
     public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA =
         "pinot.server.instance.enable.commitend.metadata";
@@ -186,10 +181,6 @@ public class CommonConstants {
         "pinot.server.instance.realtime.alloc.offheap.direct";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter";
-    public static final String CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME =
-        "pinot.server.instance.starter.maxShutdownWaitTime";
-    public static final String CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME =
-        "pinot.server.instance.starter.checkIntervalTime";
     // Configuration to consider the server ServiceStatus as being STARTED if the percent of resources (tables) that
     // are ONLINE for this this server has crossed the threshold percentage of the total number of tables
     // that it is expected to serve.
@@ -198,8 +189,6 @@ public class CommonConstants {
     public static final double DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
 
     public static final int DEFAULT_ADMIN_API_PORT = 8097;
-    public static final boolean DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK = false;
-    public static final int DEFAULT_STARTER_TIMEOUT_IN_SECONDS = 600;
     public static final String DEFAULT_READ_MODE = "heap";
     public static final String DEFAULT_INSTANCE_BASE_DIR =
         System.getProperty("java.io.tmpdir") + File.separator + "PinotServer";
@@ -214,10 +203,37 @@ public class CommonConstants {
     public static final String DEFAULT_REQUEST_HANDLER_FACTORY_CLASS =
         "org.apache.pinot.server.request.SimpleRequestHandlerFactory";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = "pinot.server.segment.fetcher";
-    public static final String DEFAULT_STAR_TREE_FORMAT_VERSION = "OFF_HEAP";
+
+    // Configs for server starter startup/shutdown checks
+    // Startup: timeout for the startup checks
+    public static final String CONFIG_OF_STARTUP_TIMEOUT_MS = "pinot.server.startup.timeoutMs";
+    public static final long DEFAULT_STARTUP_TIMEOUT_MS = 600_000L;
+    // Startup: enable service status check before claiming server up
+    public static final String CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK =
+        "pinot.server.startup.enableServiceStatusCheck";
+    public static final boolean DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK = true;
+    public static final String CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS =
+        "pinot.server.startup.serviceStatusCheckIntervalMs";
+    public static final long DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS = 10_000L;
+    // Shutdown: timeout for the shutdown checks
+    public static final String CONFIG_OF_SHUTDOWN_TIMEOUT_MS = "pinot.server.shutdown.timeoutMs";
+    public static final long DEFAULT_SHUTDOWN_TIMEOUT_MS = 600_000L;
+    // Shutdown: enable query check before shutting down the server
+    //           Will wait until no new query received and existing queries finished
+    public static final String CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK = "pinot.server.shutdown.enableQueryCheck";
+    public static final boolean DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK = true;
+    // Shutdown: threshold to mark that there is no new query received, use max query time as the default threshold
+    public static final String CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS = "pinot.server.shutdown.noQueryThresholdMs";
+    // Shutdown: enable resource check before shutting down the server
+    //           Will wait until all the resources in the external view are neither ONLINE nor CONSUMING
+    //           No need to enable this check if startup service status check is enabled
+    public static final String CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK = "pinot.server.shutdown.enableResourceCheck";
+    public static final boolean DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK = false;
+    public static final String CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS =
+        "pinot.server.shutdown.resourceCheckIntervalMs";
+    public static final long DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS = 10_000L;
+
     public static final String DEFAULT_COLUMN_MIN_MAX_VALUE_GENERATOR_MODE = "TIME";
-    public static final long DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS = 600_000L;
-    public static final long DEFAULT_CHECK_INTERVAL_TIME_MS = 60_000L;
 
     public static class SegmentCompletionProtocol {
       public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f0580a0..d4de2f9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -137,8 +137,7 @@ public abstract class ClusterTest extends ControllerTest {
     Configuration configuration = DefaultHelixStarterServerConfig.loadDefaultServerConf();
     configuration.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, LOCAL_HOST);
     configuration.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
-    configuration.addProperty(Server.CONFIG_OF_ENABLE_DEFAULT_COLUMNS, true);
-    configuration.setProperty(Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, false);
+    configuration.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, false);
     return configuration;
   }
 
@@ -163,15 +162,13 @@ public abstract class ClusterTest extends ControllerTest {
   protected void startServers(int numServers, Configuration configuration, int baseAdminApiPort, int baseNettyPort,
       String zkStr) {
     try {
+      overrideServerConf(configuration);
       for (int i = 0; i < numServers; i++) {
         configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
         configuration
             .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
         configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
         configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        // Set check interval time to 5 seconds for cluster tests.
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME, 5_000L);
-        overrideServerConf(configuration);
         _serverStarters.add(new HelixServerStarter(_clusterName, zkStr, configuration));
       }
     } catch (Exception e) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index a026593..59453f1 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,29 +26,23 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.Utils;
@@ -62,7 +55,6 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.ServiceStatus.Status;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.pinot.server.conf.ServerConf;
@@ -72,61 +64,57 @@ import org.apache.pinot.server.starter.ServerInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Server.*;
+
 
 /**
- * Single server helix starter. Will start automatically with an untagged box.
- * Will auto join current cluster as a participant.
- *
- *
- *
+ * Starter for Helix-based Pinot server.
+ * <p>When an instance starts for the first time, it will automatically join the Helix cluster with the default tag.
+ * <ul>
+ *   <li>
+ *     Optional start-up checks:
+ *     <ul>
+ *       <li>Service status check (ON by default)</li>
+ *     </ul>
+ *   </li>
+ *   <li>
+ *     Optional shut-down checks:
+ *     <ul>
+ *       <li>Query check (drains and finishes existing queries, ON by default)</li>
+ *       <li>Resource check (wait for all resources OFFLINE, OFF by default)</li>
+ *     </ul>
+ *   </li>
+ * </ul>
  */
 public class HelixServerStarter {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixServerStarter.class);
 
   private final String _helixClusterName;
-  private final Configuration _helixServerConfig;
+  private final Configuration _serverConf;
   private final String _instanceId;
-  private final long _maxQueryTimeMs;
-  private final long _maxShutdownWaitTimeMs;
-  private final long _checkIntervalTimeMs;
   private final HelixManager _helixManager;
   private final HelixAdmin _helixAdmin;
   private final ServerInstance _serverInstance;
   private final AdminApiApplication _adminApiApplication;
   private final String _zkServers;
 
-  public HelixServerStarter(String helixClusterName, String zkServer, Configuration helixServerConfig)
+  public HelixServerStarter(String helixClusterName, String zkServer, Configuration serverConf)
       throws Exception {
     LOGGER.info("Starting Pinot server");
+    long startTimeMs = System.currentTimeMillis();
     _helixClusterName = helixClusterName;
 
     // Make a clone so that changes to the config won't propagate to the caller
-    _helixServerConfig = ConfigurationUtils.cloneConfiguration(helixServerConfig);
+    _serverConf = ConfigurationUtils.cloneConfiguration(serverConf);
 
-    if (_helixServerConfig.containsKey(CommonConstants.Server.CONFIG_OF_INSTANCE_ID)) {
-      _instanceId = _helixServerConfig.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
+    if (_serverConf.containsKey(CONFIG_OF_INSTANCE_ID)) {
+      _instanceId = _serverConf.getString(CONFIG_OF_INSTANCE_ID);
     } else {
-      String host =
-          _helixServerConfig.getString(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
-      int port = _helixServerConfig
+      String host = _serverConf.getString(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
+      int port = _serverConf
           .getInt(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
       _instanceId = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + host + "_" + port;
-      _helixServerConfig.addProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID, _instanceId);
-    }
-
-    _maxQueryTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
-        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
-    _maxShutdownWaitTimeMs = _helixServerConfig
-        .getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME,
-            CommonConstants.Server.DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS);
-    long checkIntervalTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME,
-        CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS);
-    if (checkIntervalTimeMs <= 0L) {
-      _checkIntervalTimeMs = CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS;
-      LOGGER.warn("Cannot set check interval time to non-positive value. Using the default setting: {}ms",
-          _checkIntervalTimeMs);
-    } else {
-      _checkIntervalTimeMs = checkIntervalTimeMs;
+      _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, _instanceId);
     }
 
     LOGGER.info("Connecting Helix components");
@@ -135,7 +123,6 @@ public class HelixServerStarter {
     _zkServers = zkServer.replaceAll("\\s+", "");
     _helixManager =
         HelixManagerFactory.getZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
-    final StateMachineEngine stateMachineEngine = _helixManager.getStateMachineEngine();
     _helixManager.connect();
     _helixAdmin = _helixManager.getClusterManagmentTool();
     addInstanceTagIfNeeded(helixClusterName, _instanceId);
@@ -143,31 +130,30 @@ public class HelixServerStarter {
 
     LOGGER.info("Starting server instance");
     Utils.logVersions();
-    ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_helixServerConfig);
+    ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
     // Need to do this before we start receiving state transitions.
-    ServerSegmentCompletionProtocolHandler.init(_helixServerConfig.subset(
-        CommonConstants.Server.SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
+    ServerSegmentCompletionProtocolHandler
+        .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     _serverInstance = new ServerInstance();
     _serverInstance.init(serverInstanceConfig, propertyStore);
     _serverInstance.start();
 
     // Register state model factory
     SegmentFetcherAndLoader fetcherAndLoader =
-        new SegmentFetcherAndLoader(_helixServerConfig, _serverInstance.getInstanceDataManager(), propertyStore);
+        new SegmentFetcherAndLoader(_serverConf, _serverInstance.getInstanceDataManager(), propertyStore);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, _serverInstance.getInstanceDataManager(),
             fetcherAndLoader, propertyStore);
-    stateMachineEngine
+    _helixManager.getStateMachineEngine()
         .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
 
     // Start restlet server for admin API endpoint
-    int adminApiPort = _helixServerConfig
-        .getInt(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
+    int adminApiPort = _serverConf.getInt(CONFIG_OF_ADMIN_API_PORT, DEFAULT_ADMIN_API_PORT);
     _adminApiApplication = new AdminApiApplication(_serverInstance);
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
-    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
         new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager(), serverMetrics);
@@ -180,9 +166,8 @@ public class HelixServerStarter {
         .addPreConnectCallback(() -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
 
     // Register the service status handler
-    double minResourcePercentForStartup = _helixServerConfig
-        .getDouble(CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
-            CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
+    double minResourcePercentForStartup = _serverConf
+        .getDouble(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
         .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _helixClusterName,
                 _instanceId, minResourcePercentForStartup),
@@ -191,7 +176,11 @@ public class HelixServerStarter {
 
     ControllerLeaderLocator.create(_helixManager);
 
-    waitForAllSegmentsLoaded();
+    if (_serverConf
+        .getBoolean(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
+      long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS);
+      startupServiceStatusCheck(endTimeMs);
+    }
     setShuttingDownStatus(false);
     LOGGER.info("Pinot server ready");
 
@@ -203,102 +192,6 @@ public class HelixServerStarter {
     serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
   }
 
-  private void waitForAllSegmentsLoaded() {
-    if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK,
-        CommonConstants.Server.DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK)) {
-      long startTime = System.currentTimeMillis();
-      int serverStarterTimeout = _helixServerConfig.getInt(CommonConstants.Server.CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS,
-          CommonConstants.Server.DEFAULT_STARTER_TIMEOUT_IN_SECONDS);
-      long endTime = startTime + TimeUnit.SECONDS.toMillis(serverStarterTimeout);
-      boolean allSegmentsLoaded = false;
-      while (System.currentTimeMillis() < endTime) {
-        long timeToSleep = Math.min(TimeUnit.MILLISECONDS.toSeconds(endTime - System.currentTimeMillis()),
-            10 /* Sleep 10 seconds as default*/);
-        if (ServiceStatus.getServiceStatus() == Status.GOOD) {
-          LOGGER.info("All the segments are fully loaded into Pinot server, time taken: {} seconds",
-              TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime));
-          allSegmentsLoaded = true;
-          break;
-        }
-        try {
-          int numSegmentsLoaded = getNumSegmentLoaded();
-          int numSegmentsToLoad = getNumSegmentsToLoad();
-          LOGGER.warn("Waiting for all segments to be loaded, current progress: [ {} / {} ], sleep {} seconds...",
-              numSegmentsLoaded, numSegmentsToLoad, timeToSleep);
-          Thread.sleep(TimeUnit.SECONDS.toMillis(timeToSleep));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
-        } catch (Exception e) {
-          LOGGER.warn("Caught exception during waiting for segments loading...", e);
-        }
-      }
-      if (!allSegmentsLoaded) {
-        LOGGER.info("Segments are not fully loaded within {} seconds...", serverStarterTimeout);
-        logSegmentsLoadingInfo();
-      }
-    }
-  }
-
-  private int getNumSegmentLoaded() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return -1;
-    }
-
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    int numSegmentsLoaded = 0;
-    for (String tableName : tableNames) {
-      numSegmentsLoaded += instanceDataManager.getAllSegmentsMetadata(tableName).size();
-    }
-    return numSegmentsLoaded;
-  }
-
-  private int getNumSegmentsToLoad() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return -1;
-    }
-
-    HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    int numSegmentsToLoad = 0;
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    for (String tableName : tableNames) {
-      LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
-      String sessionId = liveInstance.getSessionId();
-      PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
-      CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
-      if (currentState != null && currentState.isValid()) {
-        numSegmentsToLoad += currentState.getPartitionStateMap().size();
-      }
-    }
-    return numSegmentsToLoad;
-  }
-
-  private void logSegmentsLoadingInfo() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return;
-    }
-    HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
-    String sessionId = liveInstance.getSessionId();
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    for (String tableName : tableNames) {
-      PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
-      CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
-      int numSegmentsLoaded = instanceDataManager.getAllSegmentsMetadata(tableName).size();
-      if (currentState != null && currentState.isValid()) {
-        int numSegmentsToLoad = currentState.getPartitionStateMap().size();
-        LOGGER.info(
-            "Segments are not fully loaded during server bootstrap, current progress: table: {}, segments loading progress [ {} / {} ]",
-            tableName, numSegmentsLoaded, numSegmentsToLoad);
-      }
-    }
-  }
-
   private void setAdminApiPort(int adminApiPort) {
     Map<String, String> propToUpdate = new HashMap<>();
     propToUpdate.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
@@ -337,12 +230,48 @@ public class HelixServerStarter {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _helixServerConfig
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _serverConf
         .getString(CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS,
             CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
 
+  /**
+   * When the server starts, check if the service status turns GOOD.
+   *
+   * @param endTimeMs Timeout for the check
+   */
+  private void startupServiceStatusCheck(long endTimeMs) {
+    LOGGER.info("Starting startup service status check");
+    long startTimeMs = System.currentTimeMillis();
+    long checkIntervalMs = _serverConf
+        .getLong(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
+
+    while (System.currentTimeMillis() < endTimeMs) {
+      Status serviceStatus = ServiceStatus.getServiceStatus();
+      long currentTimeMs = System.currentTimeMillis();
+      if (serviceStatus == Status.GOOD) {
+        LOGGER.info("Service status is GOOD after {}ms", currentTimeMs - startTimeMs);
+        return;
+      } else if (serviceStatus == Status.BAD) {
+        throw new IllegalStateException("Service status is BAD");
+      }
+      try {
+        Thread.sleep(Math.min(checkIntervalMs, endTimeMs - currentTimeMs));
+      } catch (InterruptedException e) {
+        LOGGER.warn("Got interrupted while checking service status", e);
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    LOGGER.warn("Service status has not turned GOOD within {}ms: {}", System.currentTimeMillis() - startTimeMs,
+        ServiceStatus.getStatusDescription());
+  }
+
   public void stop() {
+    LOGGER.info("Shutting down Pinot server");
+    long startTimeMs = System.currentTimeMillis();
+
     try {
       LOGGER.info("Closing PinotFS classes");
       PinotFSFactory.shutdown();
@@ -352,135 +281,159 @@ public class HelixServerStarter {
     _adminApiApplication.stop();
     setShuttingDownStatus(true);
 
-    // Total waiting time should include max query time.
-    final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis();
-    if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, true)) {
-      Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs, TimeUnit.MILLISECONDS);
+    long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS);
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+      shutdownQueryCheck(endTimeMs);
     }
-    waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime);
     _helixManager.disconnect();
     _serverInstance.shutDown();
-    waitUntilNoOnlineResources(System.currentTimeMillis(), endTime);
-  }
-
-  private void waitUntilNoIncomingQueries(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no incoming queries.");
-      return;
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+      shutdownResourceCheck(endTimeMs);
     }
-    LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any incoming queries...", (endTime - startTime));
-    long currentTime = startTime;
+  }
 
-    while (currentTime < endTime) {
-      if (noIncomingQueries(currentTime)) {
-        LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
-            (currentTime - startTime));
-        return;
+  /**
+   * When shutting down the server, drains the queries and waits for all the existing queries to be finished.
+   *
+   * @param endTimeMs Timeout for the check
+   */
+  private void shutdownQueryCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown query check");
+    long startTimeMs = System.currentTimeMillis();
+
+    long maxQueryTimeMs = _serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    long noQueryThresholdMs = _serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs);
+
+    // Drain queries
+    boolean queriesDrained = false;
+    long currentTimeMs;
+    while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
+      long latestQueryTimeMs = _serverInstance.getLatestQueryTime();
+      if (currentTimeMs >= latestQueryTimeMs + noQueryThresholdMs) {
+        LOGGER.info("Finished draining queries (no query received within {}ms) after {}ms",
+            currentTimeMs - latestQueryTimeMs, currentTimeMs - startTimeMs);
+        queriesDrained = true;
+        break;
       }
-
       try {
-        Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime)));
+        Thread.sleep(Math.min(noQueryThresholdMs - latestQueryTimeMs, endTimeMs - currentTimeMs));
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted when waiting for Pinot server not to receive any queries.", e);
+        LOGGER.error("Got interrupted while draining queries", e);
         Thread.currentThread().interrupt();
-        return;
+        break;
+      }
+    }
+    if (queriesDrained) {
+      // Ensure all the existing queries are finished
+      long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() + maxQueryTimeMs;
+      if (latestQueryFinishTimeMs > currentTimeMs) {
+        try {
+          Thread.sleep(latestQueryFinishTimeMs - currentTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", e);
+        }
       }
-      currentTime = System.currentTimeMillis();
+    } else {
+      LOGGER.warn("Failed to drain queries within {}ms", System.currentTimeMillis() - startTimeMs);
     }
-    LOGGER.error("Reach timeout when waiting for no incoming queries! Max waiting time: {}ms", _maxShutdownWaitTimeMs);
   }
 
   /**
-   * Init a helix spectator to watch the external view updates.
+   * When shutting down the server, waits for all the resources turn OFFLINE (all partitions served by the server are
+   * neither ONLINE or CONSUMING).
+   *
+   * @param endTimeMs Timeout for the check
    */
-  private void waitUntilNoOnlineResources(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no online resources.");
+  private void shutdownResourceCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown resource check");
+    long startTimeMs = System.currentTimeMillis();
+
+    if (startTimeMs >= endTimeMs) {
+      LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached");
       return;
     }
-    LOGGER.info("Waiting upto {}ms until no online resources...", (endTime - startTime));
 
-    // Initialize a helix spectator.
-    HelixManager spectatorManager =
-        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
+    HelixAdmin helixAdmin = null;
     try {
-      spectatorManager.connect();
-
-      Set<String> resources = fetchLatestTableResources(spectatorManager.getClusterManagmentTool());
+      helixAdmin = new ZKHelixAdmin(_zkServers);
+
+      // Monitor all enabled table resources that the server serves
+      Set<String> resourcesToMonitor = new HashSet<>();
+      for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
+        if (TableNameBuilder.isTableResource(resourceName)) {
+          IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
+          if (idealState == null || !idealState.isEnabled()) {
+            continue;
+          }
+          for (String partition : idealState.getPartitionSet()) {
+            if (idealState.getInstanceSet(partition).contains(_instanceId)) {
+              resourcesToMonitor.add(resourceName);
+              break;
+            }
+          }
+        }
+      }
 
-      long currentTime = startTime;
-      while (currentTime < endTime) {
-        if (noOnlineResources(spectatorManager, resources)) {
-          LOGGER.info("No online resource within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
-              (currentTime - startTime));
+      long checkIntervalMs = _serverConf
+          .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+      while (System.currentTimeMillis() < endTimeMs) {
+        Iterator<String> iterator = resourcesToMonitor.iterator();
+        while (iterator.hasNext()) {
+          if (isResourceOffline(helixAdmin, iterator.next())) {
+            iterator.remove();
+          } else {
+            // Do not check remaining resources if one resource is not OFFLINE
+            break;
+          }
+        }
+        if (resourcesToMonitor.isEmpty()) {
+          LOGGER.info("All resources are OFFLINE after {}ms", System.currentTimeMillis() - startTimeMs);
           return;
         }
-
         try {
-          Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime - currentTime)));
+          Thread.sleep(Math.min(checkIntervalMs, endTimeMs - System.currentTimeMillis()));
         } catch (InterruptedException e) {
-          LOGGER.error("Interrupted when waiting for no online resources.", e);
+          LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", e);
           Thread.currentThread().interrupt();
-          return;
+          break;
         }
-        currentTime = System.currentTimeMillis();
       }
-      LOGGER.error(
-          "Reach timeout waiting for no online resources! Forcing Pinot server to shutdown. Max waiting time: {}ms",
-          _maxShutdownWaitTimeMs);
-    } catch (Exception e) {
-      LOGGER.error("Exception waiting until no online resources. Skip checking external view.", e);
-    } finally {
-      spectatorManager.disconnect();
-    }
-  }
-
-  private boolean noIncomingQueries(long currentTime) {
-    return currentTime > _serverInstance.getLatestQueryTime() + _checkIntervalTimeMs;
-  }
 
-  private boolean noOnlineResources(HelixManager spectatorManager, Set<String> resources) {
-    Iterator<String> iterator = resources.iterator();
-    while (iterator.hasNext()) {
-      String resourceName = iterator.next();
-      ExternalView externalView =
-          spectatorManager.getClusterManagmentTool().getResourceExternalView(_helixClusterName, resourceName);
-      if (externalView == null) {
-        iterator.remove();
-        continue;
-      }
-      for (String partition : externalView.getPartitionSet()) {
-        Map<String, String> instanceStateMap = externalView.getStateMap(partition);
-        if (instanceStateMap.containsKey(_instanceId)) {
-          if ("ONLINE".equals(instanceStateMap.get(_instanceId))) {
-            return false;
-          }
+      // Check all remaining resources
+      Iterator<String> iterator = resourcesToMonitor.iterator();
+      while (iterator.hasNext()) {
+        if (isResourceOffline(helixAdmin, iterator.next())) {
+          iterator.remove();
         }
       }
-      iterator.remove();
+      long currentTimeMs = System.currentTimeMillis();
+      if (resourcesToMonitor.isEmpty()) {
+        LOGGER.info("All resources are OFFLINE after {}ms", currentTimeMs - startTimeMs);
+      } else {
+        LOGGER.warn("There are still {} resources ONLINE within {}ms: {}", resourcesToMonitor.size(),
+            currentTimeMs - startTimeMs, resourcesToMonitor);
+      }
+    } finally {
+      if (helixAdmin != null) {
+        helixAdmin.close();
+      }
     }
-    return true;
   }
 
-  private Set<String> fetchLatestTableResources(HelixAdmin helixAdmin) {
-    Set<String> resourcesToMonitor = new HashSet<>();
-    for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
-      // Only monitor table resources
-      if (!TableNameBuilder.isTableResource(resourceName)) {
-        continue;
-      }
-      // Only monitor enabled resources
-      IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
-      if (idealState.isEnabled()) {
-        for (String partitionName : idealState.getPartitionSet()) {
-          if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
-            resourcesToMonitor.add(resourceName);
-            break;
-          }
-        }
+  private boolean isResourceOffline(HelixAdmin helixAdmin, String resource) {
+    ExternalView externalView = helixAdmin.getResourceExternalView(_helixClusterName, resource);
+    // Treat deleted resource as OFFLINE
+    if (externalView == null) {
+      return true;
+    }
+    for (String partition : externalView.getPartitionSet()) {
+      Map<String, String> instanceStateMap = externalView.getStateMap(partition);
+      String state = instanceStateMap.get(_instanceId);
+      if ("ONLINE".equals(state) || "CONSUMING".equals(state)) {
+        return false;
       }
     }
-    return resourcesToMonitor;
+    return true;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org