You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/08/27 17:14:40 UTC
[incubator-pinot] branch master updated: Starts Broker and Server
in parallel when using ServiceManager (#5917)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 21a372b Starts Broker and Server in parallel when using ServiceManager (#5917)
21a372b is described below
commit 21a372b2f58f9c6e27fe9710d6952ed519342061
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Fri Aug 28 01:14:31 2020 +0800
Starts Broker and Server in parallel when using ServiceManager (#5917)
* Starts Broker and Server in parallel when using ServiceManager
Fixes #5876
* rejig
* also time SM
* block until services complete
* getZKHelixManager can throw
* Don't check helix as it spams logs
* matches case format
* Ensures any failure results in exit code 1
---
.../admin/command/StartServiceManagerCommand.java | 132 +++++++++++++++++----
1 file changed, 111 insertions(+), 21 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
index 4fcd6b7..caa1ca0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
@@ -23,10 +23,15 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.PINOT_SERVICE_
import java.io.File;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
@@ -41,13 +46,20 @@ import org.slf4j.LoggerFactory;
/**
- * Class to implement StartPinotService command.
+ * Starts services in the following order, and returns false on any startup failure:
*
+ * <p><ol>
+ * <li>{@link PinotServiceManager}</li>
+ * <li>Bootstrap services in role {@link ServiceRole#CONTROLLER}</li>
+ * <li>All remaining bootstrap services in parallel</li>
+ * </ol>
*/
public class StartServiceManagerCommand extends AbstractBaseAdminCommand implements Command {
private static final Logger LOGGER = LoggerFactory.getLogger(StartServiceManagerCommand.class);
- private final List<Map<String, Object>> _bootstrapConfigurations = new ArrayList<>();
- private final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"};
+ private static final long startTick = System.nanoTime();
+ private static final String[] BOOTSTRAP_SERVICES = new String[]{"CONTROLLER", "BROKER", "SERVER"};
+ // multiple instances allowed per role for testing many minions
+ private final List<Entry<ServiceRole, Map<String, Object>>> _bootstrapConfigurations = new ArrayList<>();
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help;
@@ -150,21 +162,28 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
throws Exception {
try {
LOGGER.info("Executing command: " + toString());
- _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port);
- _pinotServiceManager.start();
+ if (!startPinotService("SERVICE_MANAGER", this::startServiceManager)) {
+ return false;
+ }
+
if (_bootstrapConfigPaths != null) {
for (String configPath : _bootstrapConfigPaths) {
- _bootstrapConfigurations.add(readConfigFromFile(configPath));
+ Map<String, Object> config = readConfigFromFile(configPath);
+ ServiceRole role = ServiceRole.valueOf(config.get(PINOT_SERVICE_ROLE).toString());
+ addBootstrapService(role, config);
}
} else if (_bootstrapServices != null) {
for (String service : _bootstrapServices) {
- ServiceRole serviceRole = ServiceRole.valueOf(service.toUpperCase());
- addBootstrapService(serviceRole, getDefaultConfig(serviceRole));
+ ServiceRole role = ServiceRole.valueOf(service.toUpperCase());
+ Map<String, Object> config = getDefaultConfig(role);
+ addBootstrapService(role, config);
}
}
- for (Map<String, Object> properties : _bootstrapConfigurations) {
- startPinotService(properties);
+
+ if (!startBootstrapServices()) {
+ return false;
}
+
String pidFile = ".pinotAdminService-" + System.currentTimeMillis() + ".pid";
savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
return true;
@@ -175,6 +194,12 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
}
}
+ private String startServiceManager() {
+ _pinotServiceManager = new PinotServiceManager(_zkAddress, _clusterName, _port);
+ _pinotServiceManager.start();
+ return _pinotServiceManager.getInstanceId();
+ }
+
private Map<String, Object> getDefaultConfig(ServiceRole serviceRole)
throws SocketException, UnknownHostException {
switch (serviceRole) {
@@ -191,26 +216,91 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
}
}
- private void startPinotService(Map<String, Object> properties) {
- startPinotService(ServiceRole.valueOf(properties.get(PINOT_SERVICE_ROLE).toString()), properties);
+ /**
+ * Starts a controller synchronously unless the cluster already exists. Other services start in parallel.
+ */
+ private boolean startBootstrapServices() {
+ if (_bootstrapConfigurations.isEmpty()) return true;
+
+ List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs = new ArrayList<>();
+
+ // Start controller(s) synchronously so that other services don't fail
+ //
+ // Note: Technically, we don't need to do this if the cluster already exists, but checking the
+ // cluster takes time and clutters logs with errors when it doesn't exist.
+ for (Entry<ServiceRole, Map<String, Object>> roleToConfig : _bootstrapConfigurations) {
+ if (roleToConfig.getKey() == ServiceRole.CONTROLLER) {
+ if (!startPinotService(ServiceRole.CONTROLLER,
+ () -> _pinotServiceManager.startRole(ServiceRole.CONTROLLER, roleToConfig.getValue()))) {
+ return false;
+ }
+ } else {
+ parallelConfigs.add(roleToConfig);
+ }
+ }
+
+ return startBootstrapServicesInParallel(_pinotServiceManager, parallelConfigs);
}
- public boolean startPinotService(ServiceRole role, Map<String, Object> properties) {
+ static boolean startBootstrapServicesInParallel(
+ PinotServiceManager pinotServiceManager,
+ List<Entry<ServiceRole, Map<String, Object>>> parallelConfigs
+ ) {
+ if (parallelConfigs.isEmpty()) return true;
+
+ // True is when everything succeeded
+ AtomicBoolean failed = new AtomicBoolean(false);
+
+ List<Thread> threads = new ArrayList<>();
+ for (Entry<ServiceRole, Map<String, Object>> roleToConfig : parallelConfigs) {
+ ServiceRole role = roleToConfig.getKey();
+ Map<String, Object> config = roleToConfig.getValue();
+ Thread thread = new Thread("Start a Pinot [" + role + "]") {
+ @Override public void run() {
+ if (!startPinotService(role, () -> pinotServiceManager.startRole(role, config))) {
+ failed.set(true);
+ }
+ }
+ };
+ threads.add(thread);
+ // Unhandled exceptions are likely logged, so we don't need to re-log here
+ thread.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+ thread.start();
+ }
+
+ // Block until service startup completes
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ return !failed.get();
+ }
+
+ private static boolean startPinotService(Object role, Callable<String> serviceStarter) {
try {
- String instanceId = _pinotServiceManager.startRole(role, properties);
- LOGGER.info("Started Pinot [{}] Instance [{}].", role, instanceId);
+ LOGGER.info("Starting a Pinot [{}] at {}s since launch", role, startOffsetSeconds());
+ String instanceId = serviceStarter.call();
+ LOGGER.info("Started Pinot [{}] instance [{}] at {}s since launch", role, instanceId, startOffsetSeconds());
} catch (Exception e) {
- LOGGER.error(String.format("Failed to start a [ %s ] Service", role), e);
+ LOGGER.error(String.format("Failed to start a Pinot [%s] at %s since launch", role, startOffsetSeconds()), e);
return false;
}
return true;
}
- public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> properties) {
- properties.put(PINOT_SERVICE_ROLE, role.toString());
-
- _bootstrapConfigurations.add(properties);
-
+ /** Creates millis precision unit of seconds. ex 1.002 */
+ private static float startOffsetSeconds() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTick) / 1000f;
+ }
+
+ public StartServiceManagerCommand addBootstrapService(ServiceRole role, Map<String, Object> config) {
+ if (role == null) throw new NullPointerException("role == null");
+ config.put(PINOT_SERVICE_ROLE, role.toString()); // Ensure config has role key
+ _bootstrapConfigurations.add(new SimpleImmutableEntry<>(role, config));
return this;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org