You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/19 00:11:56 UTC
[incubator-pinot] 01/01: Add startup/shutdown checks for
HelixServerStarter
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch server_restart
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 630245ac5effc08ba0f8669dd2de034d70634cb5
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Sat May 18 17:05:44 2019 -0700
Add startup/shutdown checks for HelixServerStarter
Optional startup checks:
- Service status check (ON by default)
Optional shutdown checks:
- Query check (drains and finishes existing queries, ON by default)
- Resource check (wait for all resources OFFLINE, OFF by default)
BACKWARD-INCOMPATIBLE:
The following config keys are removed or replaced, but the default
behavior should be desired:
- pinot.server.starter.enableSegmentsLoadingCheck
- pinot.server.starter.timeoutInSeconds
- pinot.server.instance.enable.shutdown.delay
- pinot.server.instance.starter.maxShutdownWaitTime
- pinot.server.instance.starter.checkIntervalTime
---
.../apache/pinot/common/utils/CommonConstants.java | 44 +-
.../pinot/integration/tests/ClusterTest.java | 7 +-
.../server/starter/helix/HelixServerStarter.java | 443 +++++++++------------
3 files changed, 230 insertions(+), 264 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7542cc5..ca6a80f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -171,13 +171,8 @@ public class CommonConstants {
public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class";
public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port";
public static final String CONFIG_OF_ADMIN_API_PORT = "pinot.server.adminapi.port";
- public static final String CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK =
- "pinot.server.starter.enableSegmentsLoadingCheck";
- public static final String CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS = "pinot.server.starter.timeoutInSeconds";
public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version";
- public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns";
- public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay";
public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit";
public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA =
"pinot.server.instance.enable.commitend.metadata";
@@ -186,10 +181,6 @@ public class CommonConstants {
"pinot.server.instance.realtime.alloc.offheap.direct";
public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory";
public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter";
- public static final String CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME =
- "pinot.server.instance.starter.maxShutdownWaitTime";
- public static final String CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME =
- "pinot.server.instance.starter.checkIntervalTime";
// Configuration to consider the server ServiceStatus as being STARTED if the percent of resources (tables) that
// are ONLINE for this this server has crossed the threshold percentage of the total number of tables
// that it is expected to serve.
@@ -198,8 +189,6 @@ public class CommonConstants {
public static final double DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
public static final int DEFAULT_ADMIN_API_PORT = 8097;
- public static final boolean DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK = false;
- public static final int DEFAULT_STARTER_TIMEOUT_IN_SECONDS = 600;
public static final String DEFAULT_READ_MODE = "heap";
public static final String DEFAULT_INSTANCE_BASE_DIR =
System.getProperty("java.io.tmpdir") + File.separator + "PinotServer";
@@ -214,10 +203,37 @@ public class CommonConstants {
public static final String DEFAULT_REQUEST_HANDLER_FACTORY_CLASS =
"org.apache.pinot.server.request.SimpleRequestHandlerFactory";
public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = "pinot.server.segment.fetcher";
- public static final String DEFAULT_STAR_TREE_FORMAT_VERSION = "OFF_HEAP";
+
+ // Configs for server starter startup/shutdown checks
+ // Startup: timeout for the startup checks
+ public static final String CONFIG_OF_STARTUP_TIMEOUT_MS = "pinot.server.startup.timeoutMs";
+ public static final long DEFAULT_STARTUP_TIMEOUT_MS = 600_000L;
+ // Startup: enable service status check before claiming server up
+ public static final String CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK =
+ "pinot.server.startup.enableServiceStatusCheck";
+ public static final boolean DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK = true;
+ public static final String CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS =
+ "pinot.server.startup.serviceStatusCheckIntervalMs";
+ public static final long DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS = 10_000L;
+ // Shutdown: timeout for the shutdown checks
+ public static final String CONFIG_OF_SHUTDOWN_TIMEOUT_MS = "pinot.server.shutdown.timeoutMs";
+ public static final long DEFAULT_SHUTDOWN_TIMEOUT_MS = 600_000L;
+ // Shutdown: enable query check before shutting down the server
+ // Will wait until no new query received and existing queries finished
+ public static final String CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK = "pinot.server.shutdown.enableQueryCheck";
+ public static final boolean DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK = true;
+ // Shutdown: threshold to mark that there is no new query received, use max query time as the default threshold
+ public static final String CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS = "pinot.server.shutdown.noQueryThresholdMs";
+ // Shutdown: enable resource check before shutting down the server
+ // Will wait until all the resources in the external view are neither ONLINE nor CONSUMING
+ // No need to enable this check if startup service status check is enabled
+ public static final String CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK = "pinot.server.shutdown.enableResourceCheck";
+ public static final boolean DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK = false;
+ public static final String CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS =
+ "pinot.server.shutdown.resourceCheckIntervalMs";
+ public static final long DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS = 10_000L;
+
public static final String DEFAULT_COLUMN_MIN_MAX_VALUE_GENERATOR_MODE = "TIME";
- public static final long DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS = 600_000L;
- public static final long DEFAULT_CHECK_INTERVAL_TIME_MS = 60_000L;
public static class SegmentCompletionProtocol {
public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f0580a0..d4de2f9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -137,8 +137,7 @@ public abstract class ClusterTest extends ControllerTest {
Configuration configuration = DefaultHelixStarterServerConfig.loadDefaultServerConf();
configuration.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, LOCAL_HOST);
configuration.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3");
- configuration.addProperty(Server.CONFIG_OF_ENABLE_DEFAULT_COLUMNS, true);
- configuration.setProperty(Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, false);
+ configuration.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, false);
return configuration;
}
@@ -163,15 +162,13 @@ public abstract class ClusterTest extends ControllerTest {
protected void startServers(int numServers, Configuration configuration, int baseAdminApiPort, int baseNettyPort,
String zkStr) {
try {
+ overrideServerConf(configuration);
for (int i = 0; i < numServers; i++) {
configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
configuration
.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
- // Set check interval time to 5 seconds for cluster tests.
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME, 5_000L);
- overrideServerConf(configuration);
_serverStarters.add(new HelixServerStarter(_clusterName, zkStr, configuration));
}
} catch (Exception e) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index a026593..59453f1 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -19,7 +19,6 @@
package org.apache.pinot.server.starter.helix;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,29 +26,23 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.Utils;
@@ -62,7 +55,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.apache.pinot.server.conf.ServerConf;
@@ -72,61 +64,57 @@ import org.apache.pinot.server.starter.ServerInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Server.*;
+
/**
- * Single server helix starter. Will start automatically with an untagged box.
- * Will auto join current cluster as a participant.
- *
- *
- *
+ * Starter for Helix-based Pinot server.
+ * <p>When an instance starts for the first time, it will automatically join the Helix cluster with the default tag.
+ * <ul>
+ * <li>
+ * Optional start-up checks:
+ * <ul>
+ * <li>Service status check (ON by default)</li>
+ * </ul>
+ * </li>
+ * <li>
+ * Optional shut-down checks:
+ * <ul>
+ * <li>Query check (drains and finishes existing queries, ON by default)</li>
+ * <li>Resource check (wait for all resources OFFLINE, OFF by default)</li>
+ * </ul>
+ * </li>
+ * </ul>
*/
public class HelixServerStarter {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixServerStarter.class);
private final String _helixClusterName;
- private final Configuration _helixServerConfig;
+ private final Configuration _serverConf;
private final String _instanceId;
- private final long _maxQueryTimeMs;
- private final long _maxShutdownWaitTimeMs;
- private final long _checkIntervalTimeMs;
private final HelixManager _helixManager;
private final HelixAdmin _helixAdmin;
private final ServerInstance _serverInstance;
private final AdminApiApplication _adminApiApplication;
private final String _zkServers;
- public HelixServerStarter(String helixClusterName, String zkServer, Configuration helixServerConfig)
+ public HelixServerStarter(String helixClusterName, String zkServer, Configuration serverConf)
throws Exception {
LOGGER.info("Starting Pinot server");
+ long startTimeMs = System.currentTimeMillis();
_helixClusterName = helixClusterName;
// Make a clone so that changes to the config won't propagate to the caller
- _helixServerConfig = ConfigurationUtils.cloneConfiguration(helixServerConfig);
+ _serverConf = ConfigurationUtils.cloneConfiguration(serverConf);
- if (_helixServerConfig.containsKey(CommonConstants.Server.CONFIG_OF_INSTANCE_ID)) {
- _instanceId = _helixServerConfig.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
+ if (_serverConf.containsKey(CONFIG_OF_INSTANCE_ID)) {
+ _instanceId = _serverConf.getString(CONFIG_OF_INSTANCE_ID);
} else {
- String host =
- _helixServerConfig.getString(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
- int port = _helixServerConfig
+ String host = _serverConf.getString(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtil.getHostAddress());
+ int port = _serverConf
.getInt(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
_instanceId = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + host + "_" + port;
- _helixServerConfig.addProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID, _instanceId);
- }
-
- _maxQueryTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
- CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
- _maxShutdownWaitTimeMs = _helixServerConfig
- .getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_MAX_SHUTDOWN_WAIT_TIME,
- CommonConstants.Server.DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS);
- long checkIntervalTimeMs = _helixServerConfig.getLong(CommonConstants.Server.CONFIG_OF_INSTANCE_CHECK_INTERVAL_TIME,
- CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS);
- if (checkIntervalTimeMs <= 0L) {
- _checkIntervalTimeMs = CommonConstants.Server.DEFAULT_CHECK_INTERVAL_TIME_MS;
- LOGGER.warn("Cannot set check interval time to non-positive value. Using the default setting: {}ms",
- _checkIntervalTimeMs);
- } else {
- _checkIntervalTimeMs = checkIntervalTimeMs;
+ _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, _instanceId);
}
LOGGER.info("Connecting Helix components");
@@ -135,7 +123,6 @@ public class HelixServerStarter {
_zkServers = zkServer.replaceAll("\\s+", "");
_helixManager =
HelixManagerFactory.getZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
- final StateMachineEngine stateMachineEngine = _helixManager.getStateMachineEngine();
_helixManager.connect();
_helixAdmin = _helixManager.getClusterManagmentTool();
addInstanceTagIfNeeded(helixClusterName, _instanceId);
@@ -143,31 +130,30 @@ public class HelixServerStarter {
LOGGER.info("Starting server instance");
Utils.logVersions();
- ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_helixServerConfig);
+ ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
// Need to do this before we start receiving state transitions.
- ServerSegmentCompletionProtocolHandler.init(_helixServerConfig.subset(
- CommonConstants.Server.SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
+ ServerSegmentCompletionProtocolHandler
+ .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
_serverInstance = new ServerInstance();
_serverInstance.init(serverInstanceConfig, propertyStore);
_serverInstance.start();
// Register state model factory
SegmentFetcherAndLoader fetcherAndLoader =
- new SegmentFetcherAndLoader(_helixServerConfig, _serverInstance.getInstanceDataManager(), propertyStore);
+ new SegmentFetcherAndLoader(_serverConf, _serverInstance.getInstanceDataManager(), propertyStore);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId, _serverInstance.getInstanceDataManager(),
fetcherAndLoader, propertyStore);
- stateMachineEngine
+ _helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
// Start restlet server for admin API endpoint
- int adminApiPort = _helixServerConfig
- .getInt(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
+ int adminApiPort = _serverConf.getInt(CONFIG_OF_ADMIN_API_PORT, DEFAULT_ADMIN_API_PORT);
_adminApiApplication = new AdminApiApplication(_serverInstance);
_adminApiApplication.start(adminApiPort);
setAdminApiPort(adminApiPort);
- final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+ ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
// Register message handler factory
SegmentMessageHandlerFactory messageHandlerFactory =
new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager(), serverMetrics);
@@ -180,9 +166,8 @@ public class HelixServerStarter {
.addPreConnectCallback(() -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
// Register the service status handler
- double minResourcePercentForStartup = _helixServerConfig
- .getDouble(CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
- CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
+ double minResourcePercentForStartup = _serverConf
+ .getDouble(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
.of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, minResourcePercentForStartup),
@@ -191,7 +176,11 @@ public class HelixServerStarter {
ControllerLeaderLocator.create(_helixManager);
- waitForAllSegmentsLoaded();
+ if (_serverConf
+ .getBoolean(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
+ long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS);
+ startupServiceStatusCheck(endTimeMs);
+ }
setShuttingDownStatus(false);
LOGGER.info("Pinot server ready");
@@ -203,102 +192,6 @@ public class HelixServerStarter {
serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
}
- private void waitForAllSegmentsLoaded() {
- if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_STARTER_ENABLE_SEGMENTS_LOADING_CHECK,
- CommonConstants.Server.DEFAULT_STARTER_ENABLE_SEGMENTS_LOADING_CHECK)) {
- long startTime = System.currentTimeMillis();
- int serverStarterTimeout = _helixServerConfig.getInt(CommonConstants.Server.CONFIG_OF_STARTER_TIMEOUT_IN_SECONDS,
- CommonConstants.Server.DEFAULT_STARTER_TIMEOUT_IN_SECONDS);
- long endTime = startTime + TimeUnit.SECONDS.toMillis(serverStarterTimeout);
- boolean allSegmentsLoaded = false;
- while (System.currentTimeMillis() < endTime) {
- long timeToSleep = Math.min(TimeUnit.MILLISECONDS.toSeconds(endTime - System.currentTimeMillis()),
- 10 /* Sleep 10 seconds as default*/);
- if (ServiceStatus.getServiceStatus() == Status.GOOD) {
- LOGGER.info("All the segments are fully loaded into Pinot server, time taken: {} seconds",
- TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime));
- allSegmentsLoaded = true;
- break;
- }
- try {
- int numSegmentsLoaded = getNumSegmentLoaded();
- int numSegmentsToLoad = getNumSegmentsToLoad();
- LOGGER.warn("Waiting for all segments to be loaded, current progress: [ {} / {} ], sleep {} seconds...",
- numSegmentsLoaded, numSegmentsToLoad, timeToSleep);
- Thread.sleep(TimeUnit.SECONDS.toMillis(timeToSleep));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- LOGGER.warn("Caught exception during waiting for segments loading...", e);
- }
- }
- if (!allSegmentsLoaded) {
- LOGGER.info("Segments are not fully loaded within {} seconds...", serverStarterTimeout);
- logSegmentsLoadingInfo();
- }
- }
- }
-
- private int getNumSegmentLoaded() {
- InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
- if (instanceDataManager == null) {
- return -1;
- }
-
- List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
- int numSegmentsLoaded = 0;
- for (String tableName : tableNames) {
- numSegmentsLoaded += instanceDataManager.getAllSegmentsMetadata(tableName).size();
- }
- return numSegmentsLoaded;
- }
-
- private int getNumSegmentsToLoad() {
- InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
- if (instanceDataManager == null) {
- return -1;
- }
-
- HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- int numSegmentsToLoad = 0;
- List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
- for (String tableName : tableNames) {
- LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
- String sessionId = liveInstance.getSessionId();
- PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
- CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
- if (currentState != null && currentState.isValid()) {
- numSegmentsToLoad += currentState.getPartitionStateMap().size();
- }
- }
- return numSegmentsToLoad;
- }
-
- private void logSegmentsLoadingInfo() {
- InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
- if (instanceDataManager == null) {
- return;
- }
- HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- LiveInstance liveInstance = helixDataAccessor.getProperty(keyBuilder.liveInstance(_instanceId));
- String sessionId = liveInstance.getSessionId();
- List<String> tableNames = _helixAdmin.getResourcesInCluster(_helixClusterName);
- for (String tableName : tableNames) {
- PropertyKey currentStateKey = keyBuilder.currentState(_instanceId, sessionId, tableName);
- CurrentState currentState = helixDataAccessor.getProperty(currentStateKey);
- int numSegmentsLoaded = instanceDataManager.getAllSegmentsMetadata(tableName).size();
- if (currentState != null && currentState.isValid()) {
- int numSegmentsToLoad = currentState.getPartitionStateMap().size();
- LOGGER.info(
- "Segments are not fully loaded during server bootstrap, current progress: table: {}, segments loading progress [ {} / {} ]",
- tableName, numSegmentsLoaded, numSegmentsToLoad);
- }
- }
- }
-
private void setAdminApiPort(int adminApiPort) {
Map<String, String> propToUpdate = new HashMap<>();
propToUpdate.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
@@ -337,12 +230,48 @@ public class HelixServerStarter {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
- System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _helixServerConfig
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _serverConf
.getString(CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
+ /**
+ * When the server starts, check if the service status turns GOOD.
+ *
+ * @param endTimeMs Timeout for the check
+ */
+ private void startupServiceStatusCheck(long endTimeMs) {
+ LOGGER.info("Starting startup service status check");
+ long startTimeMs = System.currentTimeMillis();
+ long checkIntervalMs = _serverConf
+ .getLong(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
+
+ while (System.currentTimeMillis() < endTimeMs) {
+ Status serviceStatus = ServiceStatus.getServiceStatus();
+ long currentTimeMs = System.currentTimeMillis();
+ if (serviceStatus == Status.GOOD) {
+ LOGGER.info("Service status is GOOD after {}ms", currentTimeMs - startTimeMs);
+ return;
+ } else if (serviceStatus == Status.BAD) {
+ throw new IllegalStateException("Service status is BAD");
+ }
+ try {
+ Thread.sleep(Math.min(checkIntervalMs, endTimeMs - currentTimeMs));
+ } catch (InterruptedException e) {
+ LOGGER.warn("Got interrupted while checking service status", e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ LOGGER.warn("Service status has not turned GOOD within {}ms: {}", System.currentTimeMillis() - startTimeMs,
+ ServiceStatus.getStatusDescription());
+ }
+
public void stop() {
+ LOGGER.info("Shutting down Pinot server");
+ long startTimeMs = System.currentTimeMillis();
+
try {
LOGGER.info("Closing PinotFS classes");
PinotFSFactory.shutdown();
@@ -352,135 +281,159 @@ public class HelixServerStarter {
_adminApiApplication.stop();
setShuttingDownStatus(true);
- // Total waiting time should include max query time.
- final long endTime = _maxShutdownWaitTimeMs + System.currentTimeMillis();
- if (_helixServerConfig.getBoolean(CommonConstants.Server.CONFIG_OF_ENABLE_SHUTDOWN_DELAY, true)) {
- Uninterruptibles.sleepUninterruptibly(_maxQueryTimeMs, TimeUnit.MILLISECONDS);
+ long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS);
+ if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+ shutdownQueryCheck(endTimeMs);
}
- waitUntilNoIncomingQueries(System.currentTimeMillis(), endTime);
_helixManager.disconnect();
_serverInstance.shutDown();
- waitUntilNoOnlineResources(System.currentTimeMillis(), endTime);
- }
-
- private void waitUntilNoIncomingQueries(long startTime, final long endTime) {
- if (startTime >= endTime) {
- LOGGER.warn("Skip waiting until no incoming queries.");
- return;
+ if (_serverConf.getBoolean(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+ shutdownResourceCheck(endTimeMs);
}
- LOGGER.info("Waiting upto {}ms until Pinot server doesn't receive any incoming queries...", (endTime - startTime));
- long currentTime = startTime;
+ }
- while (currentTime < endTime) {
- if (noIncomingQueries(currentTime)) {
- LOGGER.info("No incoming query within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
- (currentTime - startTime));
- return;
+ /**
+ * When shutting down the server, drains the queries and waits for all the existing queries to be finished.
+ *
+ * @param endTimeMs Timeout for the check
+ */
+ private void shutdownQueryCheck(long endTimeMs) {
+ LOGGER.info("Starting shutdown query check");
+ long startTimeMs = System.currentTimeMillis();
+
+ long maxQueryTimeMs = _serverConf.getLong(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+ long noQueryThresholdMs = _serverConf.getLong(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs);
+
+ // Drain queries
+ boolean queriesDrained = false;
+ long currentTimeMs;
+ while ((currentTimeMs = System.currentTimeMillis()) < endTimeMs) {
+ long latestQueryTimeMs = _serverInstance.getLatestQueryTime();
+ if (currentTimeMs >= latestQueryTimeMs + noQueryThresholdMs) {
+ LOGGER.info("Finished draining queries (no query received within {}ms) after {}ms",
+ currentTimeMs - latestQueryTimeMs, currentTimeMs - startTimeMs);
+ queriesDrained = true;
+ break;
}
-
try {
- Thread.sleep(Math.min(_maxQueryTimeMs, (endTime - currentTime)));
+ Thread.sleep(Math.min(noQueryThresholdMs - latestQueryTimeMs, endTimeMs - currentTimeMs));
} catch (InterruptedException e) {
- LOGGER.error("Interrupted when waiting for Pinot server not to receive any queries.", e);
+ LOGGER.error("Got interrupted while draining queries", e);
Thread.currentThread().interrupt();
- return;
+ break;
+ }
+ }
+ if (queriesDrained) {
+ // Ensure all the existing queries are finished
+ long latestQueryFinishTimeMs = _serverInstance.getLatestQueryTime() + maxQueryTimeMs;
+ if (latestQueryFinishTimeMs > currentTimeMs) {
+ try {
+ Thread.sleep(latestQueryFinishTimeMs - currentTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", e);
+ }
}
- currentTime = System.currentTimeMillis();
+ } else {
+ LOGGER.warn("Failed to drain queries within {}ms", System.currentTimeMillis() - startTimeMs);
}
- LOGGER.error("Reach timeout when waiting for no incoming queries! Max waiting time: {}ms", _maxShutdownWaitTimeMs);
}
/**
- * Init a helix spectator to watch the external view updates.
+ * When shutting down the server, waits for all the resources turn OFFLINE (all partitions served by the server are
+ * neither ONLINE or CONSUMING).
+ *
+ * @param endTimeMs Timeout for the check
*/
- private void waitUntilNoOnlineResources(long startTime, final long endTime) {
- if (startTime >= endTime) {
- LOGGER.warn("Skip waiting until no online resources.");
+ private void shutdownResourceCheck(long endTimeMs) {
+ LOGGER.info("Starting shutdown resource check");
+ long startTimeMs = System.currentTimeMillis();
+
+ if (startTimeMs >= endTimeMs) {
+ LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached");
return;
}
- LOGGER.info("Waiting upto {}ms until no online resources...", (endTime - startTime));
- // Initialize a helix spectator.
- HelixManager spectatorManager =
- HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
+ HelixAdmin helixAdmin = null;
try {
- spectatorManager.connect();
-
- Set<String> resources = fetchLatestTableResources(spectatorManager.getClusterManagmentTool());
+ helixAdmin = new ZKHelixAdmin(_zkServers);
+
+ // Monitor all enabled table resources that the server serves
+ Set<String> resourcesToMonitor = new HashSet<>();
+ for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
+ if (TableNameBuilder.isTableResource(resourceName)) {
+ IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
+ if (idealState == null || !idealState.isEnabled()) {
+ continue;
+ }
+ for (String partition : idealState.getPartitionSet()) {
+ if (idealState.getInstanceSet(partition).contains(_instanceId)) {
+ resourcesToMonitor.add(resourceName);
+ break;
+ }
+ }
+ }
+ }
- long currentTime = startTime;
- while (currentTime < endTime) {
- if (noOnlineResources(spectatorManager, resources)) {
- LOGGER.info("No online resource within {}ms. Total waiting Time: {}ms", _checkIntervalTimeMs,
- (currentTime - startTime));
+ long checkIntervalMs = _serverConf
+ .getLong(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+ while (System.currentTimeMillis() < endTimeMs) {
+ Iterator<String> iterator = resourcesToMonitor.iterator();
+ while (iterator.hasNext()) {
+ if (isResourceOffline(helixAdmin, iterator.next())) {
+ iterator.remove();
+ } else {
+ // Do not check remaining resources if one resource is not OFFLINE
+ break;
+ }
+ }
+ if (resourcesToMonitor.isEmpty()) {
+ LOGGER.info("All resources are OFFLINE after {}ms", System.currentTimeMillis() - startTimeMs);
return;
}
-
try {
- Thread.sleep(Math.min(_checkIntervalTimeMs, (endTime - currentTime)));
+ Thread.sleep(Math.min(checkIntervalMs, endTimeMs - System.currentTimeMillis()));
} catch (InterruptedException e) {
- LOGGER.error("Interrupted when waiting for no online resources.", e);
+ LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", e);
Thread.currentThread().interrupt();
- return;
+ break;
}
- currentTime = System.currentTimeMillis();
}
- LOGGER.error(
- "Reach timeout waiting for no online resources! Forcing Pinot server to shutdown. Max waiting time: {}ms",
- _maxShutdownWaitTimeMs);
- } catch (Exception e) {
- LOGGER.error("Exception waiting until no online resources. Skip checking external view.", e);
- } finally {
- spectatorManager.disconnect();
- }
- }
-
- private boolean noIncomingQueries(long currentTime) {
- return currentTime > _serverInstance.getLatestQueryTime() + _checkIntervalTimeMs;
- }
- private boolean noOnlineResources(HelixManager spectatorManager, Set<String> resources) {
- Iterator<String> iterator = resources.iterator();
- while (iterator.hasNext()) {
- String resourceName = iterator.next();
- ExternalView externalView =
- spectatorManager.getClusterManagmentTool().getResourceExternalView(_helixClusterName, resourceName);
- if (externalView == null) {
- iterator.remove();
- continue;
- }
- for (String partition : externalView.getPartitionSet()) {
- Map<String, String> instanceStateMap = externalView.getStateMap(partition);
- if (instanceStateMap.containsKey(_instanceId)) {
- if ("ONLINE".equals(instanceStateMap.get(_instanceId))) {
- return false;
- }
+ // Check all remaining resources
+ Iterator<String> iterator = resourcesToMonitor.iterator();
+ while (iterator.hasNext()) {
+ if (isResourceOffline(helixAdmin, iterator.next())) {
+ iterator.remove();
}
}
- iterator.remove();
+ long currentTimeMs = System.currentTimeMillis();
+ if (resourcesToMonitor.isEmpty()) {
+ LOGGER.info("All resources are OFFLINE after {}ms", currentTimeMs - startTimeMs);
+ } else {
+ LOGGER.warn("There are still {} resources ONLINE within {}ms: {}", resourcesToMonitor.size(),
+ currentTimeMs - startTimeMs, resourcesToMonitor);
+ }
+ } finally {
+ if (helixAdmin != null) {
+ helixAdmin.close();
+ }
}
- return true;
}
- private Set<String> fetchLatestTableResources(HelixAdmin helixAdmin) {
- Set<String> resourcesToMonitor = new HashSet<>();
- for (String resourceName : helixAdmin.getResourcesInCluster(_helixClusterName)) {
- // Only monitor table resources
- if (!TableNameBuilder.isTableResource(resourceName)) {
- continue;
- }
- // Only monitor enabled resources
- IdealState idealState = helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
- if (idealState.isEnabled()) {
- for (String partitionName : idealState.getPartitionSet()) {
- if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
- resourcesToMonitor.add(resourceName);
- break;
- }
- }
+ private boolean isResourceOffline(HelixAdmin helixAdmin, String resource) {
+ ExternalView externalView = helixAdmin.getResourceExternalView(_helixClusterName, resource);
+ // Treat deleted resource as OFFLINE
+ if (externalView == null) {
+ return true;
+ }
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> instanceStateMap = externalView.getStateMap(partition);
+ String state = instanceStateMap.get(_instanceId);
+ if ("ONLINE".equals(state) || "CONSUMING".equals(state)) {
+ return false;
}
}
- return resourcesToMonitor;
+ return true;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org