You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/08/27 17:14:40 UTC

[incubator-pinot] branch master updated: Starts Broker and Server in parallel when using ServiceManager (#5917)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a372b  Starts Broker and Server in parallel when using ServiceManager (#5917)
21a372b is described below

commit 21a372b2f58f9c6e27fe9710d6952ed519342061
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Fri Aug 28 01:14:31 2020 +0800

    Starts Broker and Server in parallel when using ServiceManager (#5917)
    
    * Starts Broker and Server in parallel when using ServiceManager
    
    Fixes #5876
    
    * rejig
    
    * also time SM
    
    * block until services complete
    
    * getZKHelixManager can throw
    
    * Don't check helix as it spams logs
    
    * matches case format
    
    * Ensures any failure results in exit code 1
---
 .../admin/command/StartServiceManagerCommand.java  | 132 +++++++++++++++++----
 1 file changed, 111 insertions(+), 21 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
index 4fcd6b7..caa1ca0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
@@ -23,10 +23,15 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.PINOT_SERVICE_
 import java.io.File;
 import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
@@ -41,13 +46,20 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Class to implement StartPinotService command.
+ * Starts services in the following order, and returns false on any startup failure:
  *
+ * <p><ol>
+ * <li>{@link PinotServiceManager}</li>
+ * <li>Bootstrap services in role {@link ServiceRole#CONTROLLER}</li>
+ * <li>All remaining bootstrap services in parallel</li>
+ * </ol>
  */
 public class StartServiceManagerCommand extends AbstractBaseAdminCommand implements Command {
   private static final Logger LOGGER = LoggerFactory.getLogger(StartServiceManagerCommand.class);
-  private final List<Map<String, Object>> _bootstrapConfigurations = new ArrayList<>();
-  private final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"};
+  private static final long startTick = System.nanoTime();
+  private static final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"};
+  // multiple instances allowed per role for testing many minions
+  private final List<Entry<ServiceRole, Map<String, Object>>> _bootstrapConfigurations = new ArrayList<>();
 
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help;
@@ -150,21 +162,28 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
       throws Exception {
     try {
       LOGGER.info("Executing command: " + toString());
-      _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port);
-      _pinotServiceManager.start();
+      if (!startPinotService("SERVICE_MANAGER", this::startServiceManager)) {
+        return false;
+      }
+
       if (_bootstrapConfigPaths != null) {
         for (String configPath : _bootstrapConfigPaths) {
-          _bootstrapConfigurations.add(readConfigFromFile(configPath));
+          Map<String, Object> config = readConfigFromFile(configPath);
+          ServiceRole role = ServiceRole.valueOf(config.get(PINOT_SERVICE_ROLE).toString());
+          addBootstrapService(role, config);
         }
       } else if (_bootstrapServices != null) {
         for (String service : _bootstrapServices) {
-          ServiceRole serviceRole = ServiceRole.valueOf(service.toUpperCase());
-          addBootstrapService(serviceRole, getDefaultConfig(serviceRole));
+          ServiceRole role = ServiceRole.valueOf(service.toUpperCase());
+          Map<String, Object> config = getDefaultConfig(role);
+          addBootstrapService(role, config);
         }
       }
-      for (Map<String, Object> properties : _bootstrapConfigurations) {
-        startPinotService(properties);
+
+      if (!startBootstrapServices()) {
+        return false;
       }
+
       String pidFile = ".pinotAdminService-" + System.currentTimeMillis() + ".pid";
       savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
       return true;
@@ -175,6 +194,12 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
     }
   }
 
+  private String startServiceManager() {
+    _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port);
+    _pinotServiceManager.start();
+    return _pinotServiceManager.getInstanceId();
+  }
+
   private Map<String, Object> getDefaultConfig(ServiceRole serviceRole)
       throws SocketException, UnknownHostException {
     switch (serviceRole) {
@@ -191,26 +216,91 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
     }
   }
 
-  private void startPinotService(Map<String, Object> properties) {
-    startPinotService(ServiceRole.valueOf(properties.get(PINOT_SERVICE_ROLE).toString()), properties);
+  /**
+   * Starts a controller synchronously unless the cluster already exists. Other services start in parallel.
+   */
+  private boolean startBootstrapServices() {
+    if (_bootstrapConfigurations.isEmpty()) return true;
+
+    List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs = new ArrayList<>();
+
+    // Start controller(s) synchronously so that other services don't fail
+    //
+    // Note: Technically, we don't need to do this if the cluster already exists, but checking the
+    // cluster takes time and clutters logs with errors when it doesn't exist.
+    for (Entry<ServiceRole, Map<String, Object>> roleToConfig : _bootstrapConfigurations) {
+      if (roleToConfig.getKey() == ServiceRole.CONTROLLER) {
+        if (!startPinotService(ServiceRole.CONTROLLER,
+            () -> _pinotServiceManager.startRole(ServiceRole.CONTROLLER, roleToConfig.getValue()))) {
+          return false;
+        }
+      } else {
+        parallelConfigs.add(roleToConfig);
+      }
+    }
+
+    return startBootstrapServicesInParallel(_pinotServiceManager, parallelConfigs);
   }
 
-  public boolean startPinotService(ServiceRole role, Map<String, Object> properties) {
+  static boolean startBootstrapServicesInParallel(
+      PinotServiceManager pinotServiceManager,
+      List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs
+  ) {
+    if (parallelConfigs.isEmpty()) return true;
+
+    // True is when everything succeeded
+    AtomicBoolean failed = new AtomicBoolean(false);
+
+    List<Thread> threads = new ArrayList<>();
+    for (Entry<ServiceRole, Map<String, Object>> roleToConfig : parallelConfigs) {
+      ServiceRole role = roleToConfig.getKey();
+      Map<String, Object> config = roleToConfig.getValue();
+      Thread thread = new Thread("Start a Pinot [" + role + "]") {
+        @Override public void run() {
+          if (!startPinotService(role, () -> pinotServiceManager.startRole(role, config))) {
+            failed.set(true);
+          }
+        }
+      };
+      threads.add(thread);
+      // Unhandled exceptions are likely logged, so we don't need to re-log here
+      thread.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+      thread.start();
+    }
+
+    // Block until service startup completes
+    for (Thread thread : threads) {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+    return !failed.get();
+  }
+
+  private static boolean startPinotService(Object role, Callable<String> serviceStarter) {
     try {
-      String instanceId = _pinotServiceManager.startRole(role, properties);
-      LOGGER.info("Started Pinot [{}] Instance [{}].", role, instanceId);
+      LOGGER.info("Starting a Pinot [{}] at {}s since launch", role, startOffsetSeconds());
+      String instanceId = serviceStarter.call();
+      LOGGER.info("Started Pinot [{}] instance [{}] at {}s since launch", role, instanceId, startOffsetSeconds());
     } catch (Exception e) {
-      LOGGER.error(String.format("Failed to start a [ %s ] Service", role), e);
+      LOGGER.error(String.format("Failed to start a Pinot [%s] at %s since launch", role, startOffsetSeconds()), e);
       return false;
     }
     return true;
   }
 
-  public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> properties) {
-    properties.put(PINOT_SERVICE_ROLE, role.toString());
-    
-    _bootstrapConfigurations.add(properties);
-    
+  /** Creates millis precision unit of seconds. ex 1.002 */
+  private static float startOffsetSeconds() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTick) / 1000f;
+  }
+
+  public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> config) {
+    if (role == null) throw new NullPointerException("role == null");
+    config.put(PINOT_SERVICE_ROLE, role.toString()); // Ensure config has role key
+    _bootstrapConfigurations.add(new SimpleImmutableEntry<>(role, config));
     return this;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org