You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/13 23:54:42 UTC

[pinot] branch master updated: Set 'shutdownInProgress' in server config when server starts before the startup check (#8525)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 12227c5d70 Set 'shutdownInProgress' in server config when server starts before the startup check (#8525)
12227c5d70 is described below

commit 12227c5d70640f9c9d10f53cb8f555d9dc484a07
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Apr 13 16:54:37 2022 -0700

    Set 'shutdownInProgress' in server config when server starts before the startup check (#8525)
    
    For new joined server, or server not shut down normal previously, the shutdownInProgress won't be set properly when it starts. This can cause brokers routing queries to it before it finishes the startup check and cause performance issue.
    This PR adds the flag if it is not set when server starts. Also reduces the updates of instance config by checking before setting.
---
 .../server/starter/helix/BaseServerStarter.java    | 119 ++++++++++++---------
 1 file changed, 66 insertions(+), 53 deletions(-)

diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index d940ccbe2d..8a1795853f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -28,7 +28,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
@@ -287,9 +286,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
         new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
-  private void updateInstanceConfigIfNeeded() {
+  private void updateInstanceConfigIfNeeded(ServerConf serverConf) {
     InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, _instanceId);
+
+    // Update hostname and port
     boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
+
+    // Update tags
     updated |= HelixHelper.addDefaultTags(instanceConfig, () -> {
       if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_helixManager.getHelixPropertyStore())) {
         return Arrays.asList(TagNameUtils.getOfflineTagForTenant(null), TagNameUtils.getRealtimeTagForTenant(null));
@@ -297,28 +300,78 @@ public abstract class BaseServerStarter implements ServiceStartable {
         return Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE);
       }
     });
+
+    // Update admin HTTP/HTTPS port
+    int adminHttpPort = Integer.MIN_VALUE;
+    int adminHttpsPort = Integer.MIN_VALUE;
+    for (ListenerConfig listenerConfig : _listenerConfigs) {
+      String protocol = listenerConfig.getProtocol();
+      if (CommonConstants.HTTP_PROTOCOL.equals(protocol)) {
+        adminHttpPort = listenerConfig.getPort();
+      } else if (CommonConstants.HTTPS_PROTOCOL.equals(protocol)) {
+        adminHttpsPort = listenerConfig.getPort();
+      }
+    }
+    ZNRecord znRecord = instanceConfig.getRecord();
+    Map<String, String> simpleFields = znRecord.getSimpleFields();
+    updated |= updatePortIfNeeded(simpleFields, Instance.ADMIN_PORT_KEY, adminHttpPort);
+    updated |= updatePortIfNeeded(simpleFields, Instance.ADMIN_HTTPS_PORT_KEY, adminHttpsPort);
+
+    // Update netty TLS port
+    int nettyTlsPort = serverConf.isNettyTlsServerEnabled() ? serverConf.getNettyTlsPort() : Integer.MIN_VALUE;
+    updated |= updatePortIfNeeded(simpleFields, Instance.NETTY_TLS_PORT_KEY, nettyTlsPort);
+
+    // Update gRPC port
+    int grpcPort = serverConf.isEnableGrpcServer() ? serverConf.getGrpcPort() : Integer.MIN_VALUE;
+    updated |= updatePortIfNeeded(simpleFields, Instance.GRPC_PORT_KEY, grpcPort);
+
     // Update instance config with environment properties
     if (_pinotEnvironmentProvider != null) {
       // Retrieve failure domain information and add to the environment properties map
       String failureDomain = _pinotEnvironmentProvider.getFailureDomain();
       Map<String, String> environmentProperties =
           Collections.singletonMap(CommonConstants.INSTANCE_FAILURE_DOMAIN, failureDomain);
-
-      // Fetch existing environment properties map from instance configs
-      ZNRecord znRecord = instanceConfig.getRecord();
-      Map<String, String> existingEnvironmentConfigsMap = znRecord.getMapField(CommonConstants.ENVIRONMENT_IDENTIFIER);
-
-      if (!environmentProperties.equals(existingEnvironmentConfigsMap)) {
-        LOGGER.info("Updating instance: {} with environment properties: {}", environmentProperties, _instanceId);
+      if (!environmentProperties.equals(znRecord.getMapField(CommonConstants.ENVIRONMENT_IDENTIFIER))) {
+        LOGGER.info("Updating instance: {} with environment properties: {}", _instanceId, environmentProperties);
         znRecord.setMapField(CommonConstants.ENVIRONMENT_IDENTIFIER, environmentProperties);
         updated = true;
       }
     }
+
+    // If 'shutdownInProgress' is not set (new instance, or not shut down properly), set it to prevent brokers routing
+    // queries to it before finishing the startup check
+    if (!Boolean.parseBoolean(simpleFields.get(Helix.IS_SHUTDOWN_IN_PROGRESS))) {
+      LOGGER.info(
+          "Updating instance: {} with '{}' to prevent brokers routing queries to it before finishing the startup check",
+          _instanceId, Helix.IS_SHUTDOWN_IN_PROGRESS);
+      simpleFields.put(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(true));
+      updated = true;
+    }
+
     if (updated) {
       HelixHelper.updateInstanceConfig(_helixManager, instanceConfig);
     }
   }
 
+  private boolean updatePortIfNeeded(Map<String, String> instanceConfigSimpleFields, String key, int port) {
+    String existingPortStr = instanceConfigSimpleFields.get(key);
+    if (port > 0) {
+      String portStr = Integer.toString(port);
+      if (!portStr.equals(existingPortStr)) {
+        LOGGER.info("Updating '{}' for instance: {} to: {}", key, _instanceId, port);
+        instanceConfigSimpleFields.put(key, portStr);
+        return true;
+      }
+    } else {
+      if (existingPortStr != null) {
+        LOGGER.info("Removing '{}' from instance: {}", key, _instanceId);
+        instanceConfigSimpleFields.remove(key);
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void setupHelixSystemProperties() {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
@@ -388,7 +441,7 @@ public abstract class BaseServerStarter implements ServiceStartable {
     String accessControlFactoryClass =
         _serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS, Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
     LOGGER.info("Using class: {} as the AccessControlFactory", accessControlFactoryClass);
-    final AccessControlFactory accessControlFactory;
+    AccessControlFactory accessControlFactory;
     try {
       accessControlFactory = PluginManager.get().createInstance(accessControlFactoryClass);
     } catch (Exception e) {
@@ -402,8 +455,8 @@ public abstract class BaseServerStarter implements ServiceStartable {
     ControllerLeaderLocator.create(_helixManager);
     ServerSegmentCompletionProtocolHandler.init(
         _serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
-    ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
-    _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager, accessControlFactory);
+    ServerConf serverConf = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
+    _serverInstance = new ServerInstance(serverConf, _helixManager, accessControlFactory);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
     initSegmentFetcher(_serverConf);
@@ -418,53 +471,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
     LOGGER.info("Connecting Helix manager");
     _helixManager.connect();
     _helixAdmin = _helixManager.getClusterManagmentTool();
-    updateInstanceConfigIfNeeded();
+    updateInstanceConfigIfNeeded(serverConf);
 
     // Start restlet server for admin API endpoint
-
-    // Update admin API port
     LOGGER.info("Starting server admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApiApplication = new AdminApiApplication(_serverInstance, accessControlFactory, _serverConf);
     _adminApiApplication.start(_listenerConfigs);
 
-    // Update http admin port
-    Optional<ListenerConfig> adminApiHttp =
-        _listenerConfigs.stream().filter(listener -> CommonConstants.HTTP_PROTOCOL.equals(listener.getProtocol()))
-            .findFirst();
-    if (adminApiHttp.isPresent()) {
-      _helixAdmin.setConfig(_instanceConfigScope,
-          Collections.singletonMap(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiHttp.get().getPort())));
-    } else {
-      _helixAdmin.removeConfig(_instanceConfigScope, Collections.singletonList(Instance.ADMIN_PORT_KEY));
-    }
-
-    // Update https admin port
-    Optional<ListenerConfig> adminApiHttps =
-        _listenerConfigs.stream().filter(listener -> CommonConstants.HTTPS_PROTOCOL.equals(listener.getProtocol()))
-            .findFirst();
-    if (adminApiHttps.isPresent()) {
-      _helixAdmin.setConfig(_instanceConfigScope,
-          Collections.singletonMap(Instance.ADMIN_HTTPS_PORT_KEY, String.valueOf(adminApiHttps.get().getPort())));
-    } else {
-      _helixAdmin.removeConfig(_instanceConfigScope, Collections.singletonList(Instance.ADMIN_HTTPS_PORT_KEY));
-    }
-
-    // Update netty TLS port
-    if (serverInstanceConfig.isNettyTlsServerEnabled()) {
-      _helixAdmin.setConfig(_instanceConfigScope, Collections.singletonMap(Instance.NETTY_TLS_PORT_KEY,
-          String.valueOf(serverInstanceConfig.getNettyTlsPort())));
-    } else {
-      _helixAdmin.removeConfig(_instanceConfigScope, Collections.singletonList(Instance.NETTY_TLS_PORT_KEY));
-    }
-
-    // Update gRPC port
-    if (serverInstanceConfig.isEnableGrpcServer()) {
-      _helixAdmin.setConfig(_instanceConfigScope,
-          Collections.singletonMap(Instance.GRPC_PORT_KEY, String.valueOf(serverInstanceConfig.getGrpcPort())));
-    } else {
-      _helixAdmin.removeConfig(_instanceConfigScope, Collections.singletonList(Instance.GRPC_PORT_KEY));
-    }
-
     // Init QueryRewriterFactory
     LOGGER.info("Initializing QueryRewriterFactory");
     QueryRewriterFactory.init(_serverConf.getProperty(Server.CONFIG_OF_SERVER_QUERY_REWRITER_CLASS_NAMES));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org