You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/22 03:44:32 UTC

[incubator-pinot] branch master updated: Add startup/shutdown checks for HelixServerStarter (#4222)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a0b8ce  Add startup/shutdown checks for HelixServerStarter (#4222)
8a0b8ce is described below

commit 8a0b8ce75474cf4d116a639a61cf107f3375647e
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue May 21 20:44:27 2019 -0700

    Add startup/shutdown checks for HelixServerStarter (#4222)
    
    Optional startup checks:
      - Service status check (ON by default)
    Optional shutdown checks:
      - Query check (drains and finishes existing queries, ON by default)
      - Resource check (wait for all resources OFFLINE, OFF by default)
    
    Change notes:
      - Service status check is a replacement of the segmentsLoadingCheck (OFF by default). When service status does not return GOOD, log the status description instead of looping over the segments which is potentially expensive with bunch of ZK reads.
      - Query check is a replacement of the shutdownDelay (ON by default) and waitUntilNoIncomingQueries (not configurable)
      - Resource check is a replacement of the noOnlineResources (not configurable)
      - The reason why resource check is OFF by default is that: 1. query check will ensure all queries are drained; 2. when the service status check is enabled, it will compare the currentState/externalView against the resource idealState, which is enough to ensure the segments are loaded before getting queries.
      - If service status check is enabled, server will wait until the check passed then mark itself as up so broker can send queries to it. This is the desired behavior because serving queries and loading segments at the same time might cause performance issue. But this will also delay the time when server start serving queries.
    
    BACKWARD-INCOMPATIBLE:
    The following config keys are removed or replaced, but the default behavior should be desired:
      - pinot.server.starter.enableSegmentsLoadingCheck
      - pinot.server.starter.timeoutInSeconds
      - pinot.server.instance.enable.shutdown.delay
      - pinot.server.instance.starter.maxShutdownWaitTime
      - pinot.server.instance.starter.checkIntervalTime
---
 .../apache/pinot/common/utils/CommonConstants.java |  59 ++-
 .../pinot/integration/tests/ClusterTest.java       |   7 +-
 .../server/starter/helix/HelixServerStarter.java   | 534 ++++++++++-----------
 3 files changed, 305 insertions(+), 295 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7542cc5..de9a3d0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -171,13 +171,8 @@ public class CommonConstants {
     public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class";
     public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port";
     public static final String CONFIG_OF_ADMIN_API_PORT = "pinot.server.adminapi.port";
-    public static final String CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK =
-        "pinot.server.starter.enableSegmentsLoadingCheck";
-    public static final String CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS = "pinot.server.starter.timeoutInSeconds";
 
     public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version";
-    public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns";
-    public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
     public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit";
     public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA =
         "pinot.server.instance.enable.commitend.metadata";
@@ -186,10 +181,6 @@ public class CommonConstants {
         "pinot.server.instance.realtime.alloc.offheap.direct";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter";
-    public static final String CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME =
-        "pinot.server.instance.starter.maxShutdownWaitTime";
-    public static final String CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME =
-        "pinot.server.instance.starter.checkIntervalTime";
     // Configuration to consider the server ServiceStatus as being STARTED if the percent of resources (tables) that
     // are ONLINE for this this server has crossed the threshold percentage of the total number of tables
     // that it is expected to serve.
@@ -198,8 +189,6 @@ public class CommonConstants {
     public static final double DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
 
     public static final int DEFAULT_ADMIN_API_PORT = 8097;
-    public static final boolean DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK = false;
-    public static final int DEFAULT_STARTER_TIMEOUT_IN_SECONDS = 600;
     public static final String DEFAULT_READ_MODE = "heap";
     public static final String DEFAULT_INSTANCE_BASE_DIR =
         System.getProperty("java.io.tmpdir") + File.separator + "PinotServer";
@@ -214,10 +203,52 @@ public class CommonConstants {
     public static final String DEFAULT_REQUEST_HANDLER_FACTORY_CLASS =
         "org.apache.pinot.server.request.SimpleRequestHandlerFactory";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = "pinot.server.segment.fetcher";
-    public static final String DEFAULT_STAR_TREE_FORMAT_VERSION = "OFF_HEAP";
+
+    // Configs for server starter startup/shutdown checks
+    // Startup: timeout for the startup checks
+    public static final String CONFIG_OF_STARTUP_TIMEOUT_MS = "pinot.server.startup.timeoutMs";
+    public static final long DEFAULT_STARTUP_TIMEOUT_MS = 600_000L;
+    // Startup: enable service status check before claiming server up
+    public static final String CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK =
+        "pinot.server.startup.enableServiceStatusCheck";
+    public static final boolean DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK = true;
+    public static final String CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS =
+        "pinot.server.startup.serviceStatusCheckIntervalMs";
+    public static final long DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS = 10_000L;
+    // Shutdown: timeout for the shutdown checks
+    public static final String CONFIG_OF_SHUTDOWN_TIMEOUT_MS = "pinot.server.shutdown.timeoutMs";
+    public static final long DEFAULT_SHUTDOWN_TIMEOUT_MS = 600_000L;
+    // Shutdown: enable query check before shutting down the server
+    //           Will drain queries (no incoming queries and all existing queries finished)
+    public static final String CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK = "pinot.server.shutdown.enableQueryCheck";
+    public static final boolean DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK = true;
+    // Shutdown: threshold to mark that there is no incoming queries, use max query time as the default threshold
+    public static final String CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS = "pinot.server.shutdown.noQueryThresholdMs";
+    // Shutdown: enable resource check before shutting down the server
+    //           Will wait until all the resources in the external view are neither ONLINE nor CONSUMING
+    //           No need to enable this check if startup service status check is enabled
+    public static final String CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK = "pinot.server.shutdown.enableResourceCheck";
+    public static final boolean DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK = false;
+    public static final String CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS =
+        "pinot.server.shutdown.resourceCheckIntervalMs";
+    public static final long DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS = 10_000L;
+
+    // TODO: remove the deprecated config keys in the new release
+    @Deprecated
+    public static final String CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK =
+        "pinot.server.starter.enableSegmentsLoadingCheck";
+    @Deprecated
+    public static final String CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS = "pinot.server.starter.timeoutInSeconds";
+    @Deprecated
+    public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
+    @Deprecated
+    public static final String CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME =
+        "pinot.server.instance.starter.maxShutdownWaitTime";
+    @Deprecated
+    public static final String CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME =
+        "pinot.server.instance.starter.checkIntervalTime";
+
     public static final String DEFAULT_COLUMN_MIN_MAX_VALUE_GENERATOR_MODE = "TIME";
-    public static final long DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS = 600_000L;
-    public static final long DEFAULT_CHECK_INTERVAL_TIME_MS = 60_000L;
 
     public static class SegmentCompletionProtocol {
       public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f0580a0..d4de2f9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -137,8 +137,7 @@ public abstract class ClusterTest extends ControllerTest {
     Configuration configuration = DefaultHelixStarterServerConfig.loadDefaultServerConf();
     configuration.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, LOCAL_HOST);
     configuration.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
-    configuration.addProperty(Server.CONFIG_OF_ENABLE_DEFAULT_COLUMNS, true);
-    configuration.setProperty(Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, false);
+    configuration.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, false);
     return configuration;
   }
 
@@ -163,15 +162,13 @@ public abstract class ClusterTest extends ControllerTest {
   protected void startServers(int numServers, Configuration configuration, int baseAdminApiPort, int baseNettyPort,
       String zkStr) {
     try {
+      overrideServerConf(configuration);
       for (int i = 0; i < numServers; i++) {
         configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
         configuration
             .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
         configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
         configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        // Set check interval time to 5 seconds for cluster tests.
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME, 5_000L);
-        overrideServerConf(configuration);
         _serverStarters.add(new HelixServerStarter(_clusterName, zkStr, configuration));
       }
     } catch (Exception e) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index a026593..a8a6190 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,29 +26,23 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.Utils;
@@ -58,11 +51,9 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.ServiceStatus.Status;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.pinot.server.conf.ServerConf;
@@ -72,61 +63,87 @@ import org.apache.pinot.server.starter.ServerInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
+import static org.apache.pinot.common.utils.CommonConstants.Server.*;
+
 
 /**
- * Single server helix starter. Will start automatically with an untagged box.
- * Will auto join current cluster as a participant.
- *
- *
- *
+ * Starter for Pinot server.
+ * <p>When the server starts for the first time, it will automatically join the Helix cluster with the default tag.
+ * <ul>
+ *   <li>
+ *     Optional start-up checks:
+ *     <ul>
+ *       <li>Service status check (ON by default)</li>
+ *     </ul>
+ *   </li>
+ *   <li>
+ *     Optional shut-down checks:
+ *     <ul>
+ *       <li>Query check (drains and finishes existing queries, ON by default)</li>
+ *       <li>Resource check (wait for all resources OFFLINE, OFF by default)</li>
+ *     </ul>
+ *   </li>
+ * </ul>
  */
 public class HelixServerStarter {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixServerStarter.class);
 
   private final String _helixClusterName;
-  private final Configuration _helixServerConfig;
+  private final Configuration _serverConf;
   private final String _instanceId;
-  private final long _maxQueryTimeMs;
-  private final long _maxShutdownWaitTimeMs;
-  private final long _checkIntervalTimeMs;
   private final HelixManager _helixManager;
   private final HelixAdmin _helixAdmin;
   private final ServerInstance _serverInstance;
   private final AdminApiApplication _adminApiApplication;
   private final String _zkServers;
 
-  public HelixServerStarter(String helixClusterName, String zkServer, Configuration helixServerConfig)
+  public HelixServerStarter(String helixClusterName, String zkServer, Configuration serverConf)
       throws Exception {
     LOGGER.info("Starting Pinot server");
+    long startTimeMs = System.currentTimeMillis();
     _helixClusterName = helixClusterName;
 
     // Make a clone so that changes to the config won't propagate to the caller
-    _helixServerConfig = ConfigurationUtils.cloneConfiguration(helixServerConfig);
-
-    if (_helixServerConfig.containsKey(CommonConstants.Server.CONFIG_OF_INSTANCE_ID)) {
-      _instanceId = _helixServerConfig.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
-    } else {
-      String host =
-          _helixServerConfig.getString(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
-      int port = _helixServerConfig
-          .getInt(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
-      _instanceId = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + host + "_" + port;
-      _helixServerConfig.addProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID, _instanceId);
+    _serverConf = ConfigurationUtils.cloneConfiguration(serverConf);
+
+    // Log warnings for usage of deprecated config keys
+    Map<String, String> deprecatedConfigKeyWarnings = new HashMap<String, String>() {{
+      //noinspection deprecation
+      put(CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK, String.format(
+          "use %s instead, which will check the service status instead of comparing currentState/externalView with idealState (enabled by default)",
+          CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK));
+      //noinspection deprecation
+      put(CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS, String
+          .format("use %s instead, which is the timeout for the whole startup process (10 minutes by default)",
+              CONFIG_OF_STARTUP_TIMEOUT_MS));
+      //noinspection deprecation
+      put(CONFIG_OF_ENABLE_SHUTDOWN_DELAY, String.format(
+          "use %s instead, which will drain the queries (no incoming queries and all existing queries finished) (enabled by default)",
+          CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK));
+      //noinspection deprecation
+      put(CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME, String
+          .format("use %s instead, which is the timeout for the whole shutdown process (10 minutes by default)",
+              CONFIG_OF_SHUTDOWN_TIMEOUT_MS));
+      //noinspection deprecation
+      put(CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME, String
+          .format("use %s instead, which is the interval for the resource check (10 seconds by default)",
+              CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS));
+    }};
+    for (Map.Entry<String, String> entry : deprecatedConfigKeyWarnings.entrySet()) {
+      String deprecatedConfigKey = entry.getKey();
+      if (_serverConf.containsKey(deprecatedConfigKey)) {
+        LOGGER.warn("Found usage of deprecated config key: {}, {}", deprecatedConfigKey, entry.getValue());
+      }
     }
 
-    _maxQueryTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
-        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
-    _maxShutdownWaitTimeMs = _helixServerConfig
-        .getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME,
-            CommonConstants.Server.DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS);
-    long checkIntervalTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME,
-        CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS);
-    if (checkIntervalTimeMs <= 0L) {
-      _checkIntervalTimeMs = CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS;
-      LOGGER.warn("Cannot set check interval time to non-positive value. Using the default setting: {}ms",
-          _checkIntervalTimeMs);
+    if (_serverConf.containsKey(CONFIG_OF_INSTANCE_ID)) {
+      _instanceId = _serverConf.getString(CONFIG_OF_INSTANCE_ID);
     } else {
-      _checkIntervalTimeMs = checkIntervalTimeMs;
+      String host = _serverConf.getString(KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
+      int port = _serverConf.getInt(KEY_OF_SERVER_NETTY_PORT, DEFAULT_SERVER_NETTY_PORT);
+      _instanceId = PREFIX_OF_SERVER_INSTANCE + host + "_" + port;
+      _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, _instanceId);
     }
 
     LOGGER.info("Connecting Helix components");
@@ -135,7 +152,6 @@ public class HelixServerStarter {
     _zkServers = zkServer.replaceAll("\\s+", "");
     _helixManager =
         HelixManagerFactory.getZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
-    final StateMachineEngine stateMachineEngine = _helixManager.getStateMachineEngine();
     _helixManager.connect();
     _helixAdmin = _helixManager.getClusterManagmentTool();
     addInstanceTagIfNeeded(helixClusterName, _instanceId);
@@ -143,46 +159,43 @@ public class HelixServerStarter {
 
     LOGGER.info("Starting server instance");
     Utils.logVersions();
-    ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_helixServerConfig);
+    ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
     // Need to do this before we start receiving state transitions.
-    ServerSegmentCompletionProtocolHandler.init(_helixServerConfig.subset(
-        CommonConstants.Server.SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
+    ServerSegmentCompletionProtocolHandler
+        .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     _serverInstance = new ServerInstance();
     _serverInstance.init(serverInstanceConfig, propertyStore);
     _serverInstance.start();
 
     // Register state model factory
     SegmentFetcherAndLoader fetcherAndLoader =
-        new SegmentFetcherAndLoader(_helixServerConfig, _serverInstance.getInstanceDataManager(), propertyStore);
+        new SegmentFetcherAndLoader(_serverConf, _serverInstance.getInstanceDataManager(), propertyStore);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, _serverInstance.getInstanceDataManager(),
             fetcherAndLoader, propertyStore);
-    stateMachineEngine
+    _helixManager.getStateMachineEngine()
         .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
 
     // Start restlet server for admin API endpoint
-    int adminApiPort = _helixServerConfig
-        .getInt(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
+    int adminApiPort = _serverConf.getInt(CONFIG_OF_ADMIN_API_PORT, DEFAULT_ADMIN_API_PORT);
     _adminApiApplication = new AdminApiApplication(_serverInstance);
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
-    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
         new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager(), serverMetrics);
     _helixManager.getMessagingService()
         .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory);
 
-    serverMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
-        () -> _helixManager.isConnected() ? 1L : 0L);
+    serverMetrics.addCallbackGauge(INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L);
     _helixManager
         .addPreConnectCallback(() -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
 
     // Register the service status handler
-    double minResourcePercentForStartup = _helixServerConfig
-        .getDouble(CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
-            CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
+    double minResourcePercentForStartup = _serverConf
+        .getDouble(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
         .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _helixClusterName,
                 _instanceId, minResourcePercentForStartup),
@@ -191,7 +204,11 @@ public class HelixServerStarter {
 
     ControllerLeaderLocator.create(_helixManager);
 
-    waitForAllSegmentsLoaded();
+    if (_serverConf
+        .getBoolean(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
+      long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS);
+      startupServiceStatusCheck(endTimeMs);
+    }
     setShuttingDownStatus(false);
     LOGGER.info("Pinot server ready");
 
@@ -203,111 +220,15 @@ public class HelixServerStarter {
     serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
   }
 
-  private void waitForAllSegmentsLoaded() {
-    if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK,
-        CommonConstants.Server.DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK)) {
-      long startTime = System.currentTimeMillis();
-      int serverStarterTimeout = _helixServerConfig.getInt(CommonConstants.Server.CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS,
-          CommonConstants.Server.DEFAULT_STARTER_TIMEOUT_IN_SECONDS);
-      long endTime = startTime + TimeUnit.SECONDS.toMillis(serverStarterTimeout);
-      boolean allSegmentsLoaded = false;
-      while (System.currentTimeMillis() < endTime) {
-        long timeToSleep = Math.min(TimeUnit.MILLISECONDS.toSeconds(endTime - System.currentTimeMillis()),
-            10 /* Sleep 10 seconds as default*/);
-        if (ServiceStatus.getServiceStatus() == Status.GOOD) {
-          LOGGER.info("All the segments are fully loaded into Pinot server, time taken: {} seconds",
-              TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime));
-          allSegmentsLoaded = true;
-          break;
-        }
-        try {
-          int numSegmentsLoaded = getNumSegmentLoaded();
-          int numSegmentsToLoad = getNumSegmentsToLoad();
-          LOGGER.warn("Waiting for all segments to be loaded, current progress: [ {} / {} ], sleep {} seconds...",
-              numSegmentsLoaded, numSegmentsToLoad, timeToSleep);
-          Thread.sleep(TimeUnit.SECONDS.toMillis(timeToSleep));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
-        } catch (Exception e) {
-          LOGGER.warn("Caught exception during waiting for segments loading...", e);
-        }
-      }
-      if (!allSegmentsLoaded) {
-        LOGGER.info("Segments are not fully loaded within {} seconds...", serverStarterTimeout);
-        logSegmentsLoadingInfo();
-      }
-    }
-  }
-
-  private int getNumSegmentLoaded() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return -1;
-    }
-
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    int numSegmentsLoaded = 0;
-    for (String tableName : tableNames) {
-      numSegmentsLoaded += instanceDataManager.getAllSegmentsMetadata(tableName).size();
-    }
-    return numSegmentsLoaded;
-  }
-
-  private int getNumSegmentsToLoad() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return -1;
-    }
-
-    HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    int numSegmentsToLoad = 0;
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    for (String tableName : tableNames) {
-      LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
-      String sessionId = liveInstance.getSessionId();
-      PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
-      CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
-      if (currentState != null && currentState.isValid()) {
-        numSegmentsToLoad += currentState.getPartitionStateMap().size();
-      }
-    }
-    return numSegmentsToLoad;
-  }
-
-  private void logSegmentsLoadingInfo() {
-    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
-    if (instanceDataManager == null) {
-      return;
-    }
-    HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
-    String sessionId = liveInstance.getSessionId();
-    List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
-    for (String tableName : tableNames) {
-      PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
-      CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
-      int numSegmentsLoaded = instanceDataManager.getAllSegmentsMetadata(tableName).size();
-      if (currentState != null && currentState.isValid()) {
-        int numSegmentsToLoad = currentState.getPartitionStateMap().size();
-        LOGGER.info(
-            "Segments are not fully loaded during server bootstrap, current progress: table: {}, segments loading progress [ {} / {} ]",
-            tableName, numSegmentsLoaded, numSegmentsToLoad);
-      }
-    }
-  }
-
   private void setAdminApiPort(int adminApiPort) {
     Map<String, String> propToUpdate = new HashMap<>();
-    propToUpdate.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
+    propToUpdate.put(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
     updateInstanceConfigInHelix(propToUpdate);
   }
 
   private void setShuttingDownStatus(boolean shuttingDownStatus) {
     Map<String, String> propToUpdate = new HashMap<>();
-    propToUpdate.put(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, String.valueOf(shuttingDownStatus));
+    propToUpdate.put(IS_SHUTDOWN_IN_PROGRESS, String.valueOf(shuttingDownStatus));
     updateInstanceConfigInHelix(propToUpdate);
   }
 
@@ -328,7 +249,7 @@ public class HelixServerStarter {
         _helixAdmin.addInstanceTag(clusterName, instanceName,
             TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
       } else {
-        _helixAdmin.addInstanceTag(clusterName, instanceName, CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+        _helixAdmin.addInstanceTag(clusterName, instanceName, UNTAGGED_SERVER_INSTANCE);
       }
     }
   }
@@ -337,12 +258,50 @@ public class HelixServerStarter {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _helixServerConfig
-        .getString(CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS,
-            CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+        _serverConf.getString(CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS, DEFAULT_FLAPPING_TIME_WINDOW_MS));
+  }
+
+  /**
+   * When the server starts, check if the service status turns GOOD.
+   *
+   * @param endTimeMs Timeout for the check
+   */
+  private void startupServiceStatusCheck(long endTimeMs) {
+    LOGGER.info("Starting startup service status check");
+    long startTimeMs = System.currentTimeMillis();
+    long checkIntervalMs = _serverConf
+        .getLong(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
+
+    while (System.currentTimeMillis() < endTimeMs) {
+      Status serviceStatus = ServiceStatus.getServiceStatus();
+      long currentTimeMs = System.currentTimeMillis();
+      if (serviceStatus == Status.GOOD) {
+        LOGGER.info("Service status is GOOD after {}ms", currentTimeMs - startTimeMs);
+        return;
+      } else if (serviceStatus == Status.BAD) {
+        throw new IllegalStateException("Service status is BAD");
+      }
+      long sleepTimeMs = Math.min(checkIntervalMs, endTimeMs - currentTimeMs);
+      LOGGER.info("Sleep for {}ms as service status has not turned GOOD: {}", sleepTimeMs,
+          ServiceStatus.getStatusDescription());
+      try {
+        Thread.sleep(sleepTimeMs);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Got interrupted while checking service status", e);
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    LOGGER.warn("Service status has not turned GOOD within {}ms: {}", System.currentTimeMillis() - startTimeMs,
+        ServiceStatus.getStatusDescription());
   }
 
   public void stop() {
+    LOGGER.info("Shutting down Pinot server");
+    long startTimeMs = System.currentTimeMillis();
+
     try {
       LOGGER.info("Closing PinotFS classes");
       PinotFSFactory.shutdown();
@@ -352,168 +311,191 @@ public class HelixServerStarter {
     _adminApiApplication.stop();
     setShuttingDownStatus(true);
 
-    // Total waiting time should include max query time.
-    final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis();
-    if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, true)) {
-      Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs, TimeUnit.MILLISECONDS);
+    long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS);
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+      shutdownQueryCheck(endTimeMs);
     }
-    waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime);
     _helixManager.disconnect();
     _serverInstance.shutDown();
-    waitUntilNoOnlineResources(System.currentTimeMillis(), endTime);
-  }
-
-  private void waitUntilNoIncomingQueries(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no incoming queries.");
-      return;
+    if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+      shutdownResourceCheck(endTimeMs);
     }
-    LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any incoming queries...", (endTime - startTime));
-    long currentTime = startTime;
+  }
 
-    while (currentTime < endTime) {
-      if (noIncomingQueries(currentTime)) {
-        LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
-            (currentTime - startTime));
-        return;
+  /**
+   * When shutting down the server, drains the queries (no incoming queries and all existing queries finished).
+   *
+   * @param endTimeMs Timeout for the check
+   */
+  private void shutdownQueryCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown query check");
+    long startTimeMs = System.currentTimeMillis();
+
+    long maxQueryTimeMs = _serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    long noQueryThresholdMs = _serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs);
+
+    // Wait until no incoming queries
+    boolean noIncomingQueries = false;
+    long currentTimeMs;
+    while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
+      long noQueryTimeMs = currentTimeMs - _serverInstance.getLatestQueryTime();
+      if (noQueryTimeMs >= noQueryThresholdMs) {
+        LOGGER.info("No query received within {}ms (larger than the threshold: {}ms), mark it as no incoming queries",
+            noQueryTimeMs, noQueryThresholdMs);
+        noIncomingQueries = true;
+        break;
       }
-
+      long sleepTimeMs = Math.min(noQueryThresholdMs - noQueryTimeMs, endTimeMs - currentTimeMs);
+      LOGGER.info(
+          "Sleep for {}ms as there are still incoming queries (no query time: {}ms is smaller than the threshold: {}ms)",
+          sleepTimeMs, noQueryTimeMs, noQueryThresholdMs);
       try {
-        Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime)));
+        Thread.sleep(sleepTimeMs);
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted when waiting for Pinot server not to receive any queries.", e);
+        LOGGER.warn("Got interrupted while waiting for no incoming queries", e);
         Thread.currentThread().interrupt();
-        return;
+        break;
       }
-      currentTime = System.currentTimeMillis();
     }
-    LOGGER.error("Reach timeout when waiting for no incoming queries! Max waiting time: {}ms", _maxShutdownWaitTimeMs);
+    if (noIncomingQueries) {
+      // Ensure all the existing queries are finished
+      long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() + maxQueryTimeMs;
+      if (latestQueryFinishTimeMs > currentTimeMs) {
+        long sleepTimeMs = latestQueryFinishTimeMs - currentTimeMs;
+        LOGGER.info("Sleep for {}ms to ensure all the existing queries are finished", sleepTimeMs);
+        try {
+          Thread.sleep(sleepTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+      LOGGER.info("Finished draining queries after {}ms", System.currentTimeMillis() - startTimeMs);
+    } else {
+      LOGGER.warn("Failed to drain queries within {}ms", System.currentTimeMillis() - startTimeMs);
+    }
   }
 
   /**
-   * Init a helix spectator to watch the external view updates.
+   * When shutting down the server, waits for all the resources turn OFFLINE (all partitions served by the server are
+   * neither ONLINE or CONSUMING).
+   *
+   * @param endTimeMs Timeout for the check
    */
-  private void waitUntilNoOnlineResources(long startTime, final long endTime) {
-    if (startTime >= endTime) {
-      LOGGER.warn("Skip waiting until no online resources.");
+  private void shutdownResourceCheck(long endTimeMs) {
+    LOGGER.info("Starting shutdown resource check");
+    long startTimeMs = System.currentTimeMillis();
+
+    if (startTimeMs >= endTimeMs) {
+      LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached");
       return;
     }
-    LOGGER.info("Waiting upto {}ms until no online resources...", (endTime - startTime));
 
-    // Initialize a helix spectator.
-    HelixManager spectatorManager =
-        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
+    HelixAdmin helixAdmin = null;
     try {
-      spectatorManager.connect();
-
-      Set<String> resources = fetchLatestTableResources(spectatorManager.getClusterManagmentTool());
+      helixAdmin = new ZKHelixAdmin(_zkServers);
+
+      // Monitor all enabled table resources that the server serves
+      Set<String> resourcesToMonitor = new HashSet<>();
+      for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
+        if (TableNameBuilder.isTableResource(resourceName)) {
+          IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
+          if (idealState == null || !idealState.isEnabled()) {
+            continue;
+          }
+          for (String partition : idealState.getPartitionSet()) {
+            if (idealState.getInstanceSet(partition).contains(_instanceId)) {
+              resourcesToMonitor.add(resourceName);
+              break;
+            }
+          }
+        }
+      }
 
-      long currentTime = startTime;
-      while (currentTime < endTime) {
-        if (noOnlineResources(spectatorManager, resources)) {
-          LOGGER.info("No online resource within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
-              (currentTime - startTime));
+      long checkIntervalMs = _serverConf
+          .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+      while (System.currentTimeMillis() < endTimeMs) {
+        Iterator<String> iterator = resourcesToMonitor.iterator();
+        String currentResource = null;
+        while (iterator.hasNext()) {
+          currentResource = iterator.next();
+          if (isResourceOffline(helixAdmin, currentResource)) {
+            iterator.remove();
+          } else {
+            // Do not check remaining resources if one resource is not OFFLINE
+            break;
+          }
+        }
+        long currentTimeMs = System.currentTimeMillis();
+        if (resourcesToMonitor.isEmpty()) {
+          LOGGER.info("All resources are OFFLINE after {}ms", currentTimeMs - startTimeMs);
           return;
         }
-
+        long sleepTimeMs = Math.min(checkIntervalMs, endTimeMs - currentTimeMs);
+        LOGGER.info("Sleep for {}ms as some resources [{}, ...] are still ONLINE", sleepTimeMs, currentResource);
         try {
-          Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime - currentTime)));
+          Thread.sleep(sleepTimeMs);
         } catch (InterruptedException e) {
-          LOGGER.error("Interrupted when waiting for no online resources.", e);
+          LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", e);
           Thread.currentThread().interrupt();
-          return;
+          break;
         }
-        currentTime = System.currentTimeMillis();
       }
-      LOGGER.error(
-          "Reach timeout waiting for no online resources! Forcing Pinot server to shutdown. Max waiting time: {}ms",
-          _maxShutdownWaitTimeMs);
-    } catch (Exception e) {
-      LOGGER.error("Exception waiting until no online resources. Skip checking external view.", e);
-    } finally {
-      spectatorManager.disconnect();
-    }
-  }
-
-  private boolean noIncomingQueries(long currentTime) {
-    return currentTime > _serverInstance.getLatestQueryTime() + _checkIntervalTimeMs;
-  }
 
-  private boolean noOnlineResources(HelixManager spectatorManager, Set<String> resources) {
-    Iterator<String> iterator = resources.iterator();
-    while (iterator.hasNext()) {
-      String resourceName = iterator.next();
-      ExternalView externalView =
-          spectatorManager.getClusterManagmentTool().getResourceExternalView(_helixClusterName, resourceName);
-      if (externalView == null) {
-        iterator.remove();
-        continue;
-      }
-      for (String partition : externalView.getPartitionSet()) {
-        Map<String, String> instanceStateMap = externalView.getStateMap(partition);
-        if (instanceStateMap.containsKey(_instanceId)) {
-          if ("ONLINE".equals(instanceStateMap.get(_instanceId))) {
-            return false;
-          }
+      // Check all remaining resources
+      Iterator<String> iterator = resourcesToMonitor.iterator();
+      while (iterator.hasNext()) {
+        if (isResourceOffline(helixAdmin, iterator.next())) {
+          iterator.remove();
         }
       }
-      iterator.remove();
+      long currentTimeMs = System.currentTimeMillis();
+      if (resourcesToMonitor.isEmpty()) {
+        LOGGER.info("All resources are OFFLINE after {}ms", currentTimeMs - startTimeMs);
+      } else {
+        LOGGER.warn("There are still {} resources ONLINE within {}ms: {}", resourcesToMonitor.size(),
+            currentTimeMs - startTimeMs, resourcesToMonitor);
+      }
+    } finally {
+      if (helixAdmin != null) {
+        helixAdmin.close();
+      }
     }
-    return true;
   }
 
-  private Set<String> fetchLatestTableResources(HelixAdmin helixAdmin) {
-    Set<String> resourcesToMonitor = new HashSet<>();
-    for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
-      // Only monitor table resources
-      if (!TableNameBuilder.isTableResource(resourceName)) {
-        continue;
-      }
-      // Only monitor enabled resources
-      IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
-      if (idealState.isEnabled()) {
-        for (String partitionName : idealState.getPartitionSet()) {
-          if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
-            resourcesToMonitor.add(resourceName);
-            break;
-          }
-        }
+  private boolean isResourceOffline(HelixAdmin helixAdmin, String resource) {
+    ExternalView externalView = helixAdmin.getResourceExternalView(_helixClusterName, resource);
+    // Treat deleted resource as OFFLINE
+    if (externalView == null) {
+      return true;
+    }
+    for (String partition : externalView.getPartitionSet()) {
+      Map<String, String> instanceStateMap = externalView.getStateMap(partition);
+      String state = instanceStateMap.get(_instanceId);
+      if (StateModel.SegmentOnlineOfflineStateModel.ONLINE.equals(state)
+          || StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) {
+        return false;
       }
     }
-    return resourcesToMonitor;
+    return true;
   }
 
   /**
    * This method is for reference purpose only.
    */
+  @SuppressWarnings("UnusedReturnValue")
   public static HelixServerStarter startDefault()
       throws Exception {
-    Configuration configuration = new PropertiesConfiguration();
+    Configuration serverConf = new BaseConfiguration();
     int port = 8003;
-    configuration.addProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, port);
-    configuration.addProperty("pinot.server.instance.dataDir", "/tmp/PinotServer/test" + port + "/index");
-    configuration.addProperty("pinot.server.instance.segmentTarDir", "/tmp/PinotServer/test" + port + "/segmentTar");
-    return new HelixServerStarter("quickstart", "localhost:2191", configuration);
+    serverConf.addProperty(KEY_OF_SERVER_NETTY_PORT, port);
+    serverConf.addProperty(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port + "/index");
+    serverConf.addProperty(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" + port + "/segmentTar");
+    return new HelixServerStarter("quickstart", "localhost:2191", serverConf);
   }
 
   public static void main(String[] args)
       throws Exception {
-    /*
-    // Another way to start a server via IDE
-    if (args.length < 1) {
-      throw new RuntimeException("Usage: cmd <port>");
-    }
-    for (int i = 0; i < args.length; i++) {
-      final int port = Integer.valueOf(args[i]);
-      final String serverFQDN = "localhost";
-      final String server = "Server_" + serverFQDN + "_" + port;
-      final Configuration configuration = new PropertiesConfiguration();
-      configuration.addProperty("pinot.server.instance.dataDir", "/tmp/PinotServer/test" + port + "/index");
-      configuration.addProperty("pinot.server.instance.segmentTarDir", "/tmp/PinotServer/test" + port + "/segmentTar");
-      configuration.addProperty("instanceId", server);
-      final HelixServerStarter pinotHelixStarter = new HelixServerStarter("PinotPerfTestCluster", "localhost:2191", configuration);
-    }
-    */
     startDefault();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org