You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/07/02 02:59:14 UTC

[pinot] branch master updated: adding MultiStageEngineQuickStart (#8980)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eda488f5d7 adding MultiStageEngineQuickStart (#8980)
eda488f5d7 is described below

commit eda488f5d7aeb68255c3452d00daacffbbbdd477
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Jul 1 19:59:07 2022 -0700

    adding MultiStageEngineQuickStart (#8980)
    
    * adding MultiStageEngineQuickStart
    
    * address diff comments
    
    * add more queries
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../tests/MultiStageEngineIntegrationTest.java     |   3 +-
 .../query/runtime/utils/ServerRequestUtils.java    |   2 +
 .../apache/pinot/query/service/QueryConfig.java    |   4 +-
 .../server/starter/helix/BaseServerStarter.java    |   5 +-
 .../pinot/tools/MultistageEngineQuickStart.java    | 129 +++++++++++++++++++++
 .../tools/admin/command/PostQueryCommand.java      |  18 ++-
 .../tools/admin/command/QuickstartRunner.java      |  15 ++-
 .../tools/admin/command/StartBrokerCommand.java    |  17 ++-
 .../tools/admin/command/StartServerCommand.java    |  39 ++++++-
 .../admin/command/StartServiceManagerCommand.java  |   7 +-
 .../apache/pinot/tools/utils/PinotConfigUtils.java |  12 +-
 .../baseballStats_offline_table_config.json        |   2 +-
 .../baseballStats_offline_table_config.json        |   2 +-
 13 files changed, 236 insertions(+), 19 deletions(-)

diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 0d78d2b182..e5c92c1799 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -115,7 +115,8 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
     return new Object[][] {
         new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1},
         new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73},
-        new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2},
+        new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE"
+            + " WHERE CarrierDelay=15 AND ArrDelay>20", 172, 2},
         new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin "
             + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146}
     };
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 15a8b0194c..738992fe52 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
  * conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor.
  */
 public class ServerRequestUtils {
+  private static final int DEFAULT_LEAF_NODE_LIMIT = 1_000_000;
 
   private ServerRequestUtils() {
     // do not instantiate.
@@ -84,6 +85,7 @@ public class ServerRequestUtils {
 
   public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan) {
     PinotQuery pinotQuery = new PinotQuery();
+    pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
     pinotQuery.setExplain(false);
     walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery);
     return pinotQuery;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index 8de432f3ea..c0bbb08174 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -23,13 +23,13 @@ package org.apache.pinot.query.service;
  */
 public class QueryConfig {
   public static final String KEY_OF_QUERY_SERVER_PORT = "pinot.query.server.port";
-  public static final int DEFAULT_QUERY_SERVER_PORT = -1;
+  public static final int DEFAULT_QUERY_SERVER_PORT = 0;
 
   public static final String KEY_OF_QUERY_RUNNER_HOSTNAME = "pinot.query.runner.hostname";
   public static final String DEFAULT_QUERY_RUNNER_HOSTNAME = "localhost";
   // query runner port is the mailbox port.
   public static final String KEY_OF_QUERY_RUNNER_PORT = "pinot.query.runner.port";
-  public static final int DEFAULT_QUERY_RUNNER_PORT = -1;
+  public static final int DEFAULT_QUERY_RUNNER_PORT = 0;
 
   private QueryConfig() {
     // do not instantiate.
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 8f00c56bfb..c7dafd8a4e 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -183,8 +183,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
     int dataTableVersion =
         _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
     if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) {
-      throw new UnsupportedOperationException("Setting experimental DataTable version newer than default via config "
-          + "is not allowed. Current default DataTable version: " + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+      LOGGER.warn("Setting experimental DataTable version newer than default via config could result in"
+          + " backward-compatibility issues. Current default DataTable version: "
+          + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
     }
     DataTableFactory.setDataTableVersion(dataTableVersion);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
new file mode 100644
index 0000000000..c7179c6856
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+
+
+public class MultistageEngineQuickStart extends QuickStartBase {
+
+  @Override
+  public List<String> types() {
+    return Collections.singletonList("MULTI_STAGE");
+  }
+
+  @Override
+  public Map<String, Object> getConfigOverrides() {
+    Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
+    overrides.put("pinot.multistage.engine.enabled", "true");
+    overrides.put("pinot.server.instance.currentDataTableVersion", 4);
+    return overrides;
+  }
+
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis()));
+
+    // Baseball stat table
+    File baseBallStatsBaseDir = new File(quickstartTmpDir, "baseballStats");
+    File schemaFile = new File(baseBallStatsBaseDir, "baseballStats_schema.json");
+    File tableConfigFile = new File(baseBallStatsBaseDir, "baseballStats_offline_table_config.json");
+    File ingestionJobSpecFile = new File(baseBallStatsBaseDir, "ingestionJobSpec.yaml");
+    ClassLoader classLoader = Quickstart.class.getClassLoader();
+    URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, schemaFile);
+    resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, tableConfigFile);
+    resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+    QuickstartTableRequest request = new QuickstartTableRequest(baseBallStatsBaseDir.getAbsolutePath());
+
+    File tempDir = new File(quickstartTmpDir, "tmp");
+    FileUtils.forceMkdir(tempDir);
+    QuickstartRunner runner =
+        new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, tempDir, getConfigOverrides());
+
+    printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Quickstart.Color.GREEN, "***** Shutting down offline quick start *****");
+        runner.stop();
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+    printStatus(Quickstart.Color.CYAN, "***** Bootstrap baseballStats table *****");
+    runner.bootstrapTable();
+
+    waitForBootstrapToComplete(null);
+
+    Map<String, String> queryOptions = Collections.singletonMap(
+        CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE, "true");
+
+    printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart setup complete *****");
+    String q1 = "SELECT count(*) FROM baseballStats_OFFLINE limit 1";
+    printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
+    printStatus(Quickstart.Color.CYAN, "Query : " + q1);
+    printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q1, queryOptions)));
+    printStatus(Quickstart.Color.GREEN, "***************************************************");
+
+    String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID"
+        + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON a.playerID = b.playerID"
+        + " WHERE a.runs > 160 AND b.runs < 2";
+    printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with more than 160-run some year(s) and"
+        + " with less than 2-run some other year(s)");
+    printStatus(Quickstart.Color.CYAN, "Query : " + q2);
+    printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q2, queryOptions)));
+    printStatus(Quickstart.Color.GREEN, "***************************************************");
+
+    printStatus(Quickstart.Color.GREEN, "***************************************************");
+    printStatus(Quickstart.Color.YELLOW, "Example query run completed.");
+    printStatus(Quickstart.Color.GREEN, "***************************************************");
+    printStatus(Quickstart.Color.YELLOW, "Please use broker port for executing multistage queries.");
+    printStatus(Quickstart.Color.GREEN, "***************************************************");
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", "MULTI_STAGE"));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index f004b5f367..aad2e05430 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pinot.tools.admin.command;
 
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
@@ -62,6 +63,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
       + "this message.")
   private boolean _help = false;
 
+  @CommandLine.Option(names = {"-o", "-option"}, required = false, description = "Additional options '-o key=value'")
+  private Map<String, String> _additionalOptions = new HashMap<>();
+
   private AuthProvider _authProvider;
 
   @Override
@@ -124,6 +128,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
     return this;
   }
 
+  public PostQueryCommand setAdditionalOptions(Map<String, String> additionalOptions) {
+    _additionalOptions.putAll(additionalOptions);
+    return this;
+  }
+
   public String run()
       throws Exception {
     if (_brokerHost == null) {
@@ -131,7 +140,12 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
     }
     LOGGER.info("Executing command: " + this);
     String url = _brokerProtocol + "://" + _brokerHost + ":" + _brokerPort + "/query/sql";
-    String request = JsonUtils.objectToString(Collections.singletonMap(Request.SQL, _query));
+    Map<String, String> payload = new HashMap<>();
+    payload.put(Request.SQL, _query);
+    if (_additionalOptions != null) {
+      payload.putAll(_additionalOptions);
+    }
+    String request = JsonUtils.objectToString(payload);
     return sendRequest("POST", url, request, makeAuthHeaders(makeAuthProvider(_authProvider,
             _authTokenUrl, _authToken, _user, _password)));
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index c656278d85..c1f7cf4b38 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -53,6 +54,10 @@ public class QuickstartRunner {
   private static final int DEFAULT_SERVER_GRPC_PORT = 7100;
   private static final int DEFAULT_MINION_PORT = 6000;
 
+  private static final int DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT = 8421;
+  private static final int DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT = 8442;
+  private static final int DEFAULT_SERVER_MULTISTAGE_SERVER_PORT = 8842;
+
   private static final String DEFAULT_ZK_DIR = "PinotZkDir";
   private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir";
   private static final String DEFAULT_SERVER_DATA_DIR = "PinotServerDataDir";
@@ -132,6 +137,7 @@ public class QuickstartRunner {
     for (int i = 0; i < _numBrokers; i++) {
       StartBrokerCommand brokerStarter = new StartBrokerCommand();
       brokerStarter.setPort(DEFAULT_BROKER_PORT + i)
+          .setBrokerMultiStageRunnerPort(DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT + i)
           .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
           .setConfigOverrides(_configOverrides);
       if (!brokerStarter.execute()) {
@@ -147,6 +153,8 @@ public class QuickstartRunner {
       StartServerCommand serverStarter = new StartServerCommand();
       serverStarter.setPort(DEFAULT_SERVER_NETTY_PORT + i).setAdminPort(DEFAULT_SERVER_ADMIN_API_PORT + i)
           .setGrpcPort(DEFAULT_SERVER_GRPC_PORT + i)
+          .setMultiStageServerPort(DEFAULT_SERVER_MULTISTAGE_SERVER_PORT + i)
+          .setMultiStageRunnerPort(DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT + i)
           .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
           .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath())
           .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())
@@ -230,10 +238,15 @@ public class QuickstartRunner {
 
   public JsonNode runQuery(String query)
       throws Exception {
+    return runQuery(query, Collections.emptyMap());
+  }
+
+  public JsonNode runQuery(String query, Map<String, String> additionalOptions)
+      throws Exception {
     int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
     return JsonUtils.stringToJsonNode(
         new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthProvider(_authProvider)
-            .setQuery(query).run());
+            .setAdditionalOptions(additionalOptions).setQuery(query).run());
   }
 
   public static void registerDefaultPinotFS() {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index dffd8e3bdf..e8e5ae7f68 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.tools.Command;
@@ -51,6 +52,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
   @CommandLine.Option(names = {"-brokerPort"}, required = false, description = "Broker port number to use for query.")
   private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
 
+  @CommandLine.Option(names = {"-brokerMultiStageRunnerPort"}, required = false,
+      description = "Broker port number to use for query.")
+  private int _brokerMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT;
+
   @CommandLine.Option(names = {"-zkAddress"}, required = false, description = "HTTP address of Zookeeper.")
   private String _zkAddress = DEFAULT_ZK_ADDRESS;
 
@@ -76,6 +81,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
     return _brokerPort;
   }
 
+  public int getBrokerMultiStageRunnerPort() {
+    return _brokerMultiStageRunnerPort;
+  }
+
   public String getZkAddress() {
     return _zkAddress;
   }
@@ -125,6 +134,11 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
     return this;
   }
 
+  public StartBrokerCommand setBrokerMultiStageRunnerPort(int brokerMultiStageRunnerPort) {
+    _brokerMultiStageRunnerPort = brokerMultiStageRunnerPort;
+    return this;
+  }
+
   public StartBrokerCommand setZkAddress(String zkAddress) {
     _zkAddress = zkAddress;
     return this;
@@ -171,7 +185,8 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
       _zkAddress = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, _zkAddress);
       _clusterName = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, _clusterName);
     } else {
-      properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort));
+      properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort,
+          _brokerMultiStageRunnerPort));
     }
     if (_configOverrides != null) {
       properties.putAll(_configOverrides);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
index fc85408646..80b2cd70bd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.tools.Command;
@@ -59,6 +60,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
       description = "Port number to serve the grpc query.")
   private int _serverGrpcPort = CommonConstants.Server.DEFAULT_GRPC_PORT;
 
+  @CommandLine.Option(names = {"-serverMultiStageServerPort"}, required = false,
+      description = "Port number to multi-stage query engine service entrypoint.")
+  private int _serverMultiStageServerPort = QueryConfig.DEFAULT_QUERY_SERVER_PORT;
+
+  @CommandLine.Option(names = {"-serverMultiStageRunnerPort"}, required = false,
+      description = "Port number to multi-stage query engine runner communication.")
+  private int _serverMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT;
+
   @CommandLine.Option(names = {"-dataDir"}, required = false, description = "Path to directory containing data.")
   private String _dataDir = PinotConfigUtils.TMP_DIR + "data/pinotServerData";
 
@@ -100,6 +109,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
     return _serverGrpcPort;
   }
 
+  public int getServerMultiStageServerPort() {
+    return _serverMultiStageServerPort;
+  }
+
+  public int getServerMultiStageRunnerPort() {
+    return _serverMultiStageRunnerPort;
+  }
+
   public String getDataDir() {
     return _dataDir;
   }
@@ -149,6 +166,16 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
     return this;
   }
 
+  public StartServerCommand setMultiStageServerPort(int multiStageServerPort) {
+    _serverMultiStageServerPort = multiStageServerPort;
+    return this;
+  }
+
+  public StartServerCommand setMultiStageRunnerPort(int multiStageRunnerPort) {
+    _serverMultiStageRunnerPort = multiStageRunnerPort;
+    return this;
+  }
+
   public StartServerCommand setDataDir(String dataDir) {
     _dataDir = dataDir;
     return this;
@@ -173,11 +200,15 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
   public String toString() {
     if (_configFileName != null) {
       return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort
-          + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -configFileName "
-          + _configFileName + " -zkAddress " + _zkAddress);
+          + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort
+          + " -serverMultistageServerPort " + _serverMultiStageServerPort
+          + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -configFileName " + _configFileName
+          + " -zkAddress " + _zkAddress);
     } else {
       return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort
-          + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -dataDir " + _dataDir
+          + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort
+          + " -serverMultistageServerPort " + _serverMultiStageServerPort
+          + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -dataDir " + _dataDir
           + " -segmentDir " + _segmentDir + " -zkAddress " + _zkAddress);
     }
   }
@@ -229,7 +260,7 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
     } else {
       properties.putAll(PinotConfigUtils
           .generateServerConf(_clusterName, _zkAddress, _serverHost, _serverPort, _serverAdminPort, _serverGrpcPort,
-              _dataDir, _segmentDir));
+              _serverMultiStageServerPort, _serverMultiStageRunnerPort, _dataDir, _segmentDir));
     }
     if (_configOverrides != null) {
       properties.putAll(_configOverrides);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
index 1a5765faa8..b3274077ac 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.tools.Command;
@@ -218,11 +219,13 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
             ControllerConf.ControllerMode.DUAL, true);
       case BROKER:
         return PinotConfigUtils
-            .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT);
+            .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT,
+                QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
       case SERVER:
         return PinotConfigUtils
             .generateServerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT,
-                CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT, null, null);
+                CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT,
+                QueryConfig.DEFAULT_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT, null, null);
       default:
         throw new RuntimeException("No default config found for service role: " + serviceRole);
     }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 046b2ca4e4..98b59dc1e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf;
+import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -149,7 +150,7 @@ public class PinotConfigUtils {
   }
 
   public static Map<String, Object> generateBrokerConf(String clusterName, String zkAddress, String brokerHost,
-      int brokerPort)
+      int brokerPort, int brokerMultiStageRunnerPort)
       throws SocketException, UnknownHostException {
     Map<String, Object> properties = new HashMap<>();
     properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, clusterName);
@@ -157,11 +158,14 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Broker.CONFIG_OF_BROKER_HOSTNAME,
         !StringUtils.isEmpty(brokerHost) ? brokerHost : NetUtils.getHostAddress());
     properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, brokerPort != 0 ? brokerPort : getAvailablePort());
+    properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, brokerMultiStageRunnerPort != 0
+        ? brokerMultiStageRunnerPort : getAvailablePort());
     return properties;
   }
 
   public static Map<String, Object> generateServerConf(String clusterName, String zkAddress, String serverHost,
-      int serverPort, int serverAdminPort, int serverGrpcPort, String serverDataDir, String serverSegmentDir)
+      int serverPort, int serverAdminPort, int serverGrpcPort, int serverMultiStageServerPort,
+      int serverMultiStageRunnerPort, String serverDataDir, String serverSegmentDir)
       throws SocketException, UnknownHostException {
     if (serverHost == null) {
       serverHost = NetUtils.getHostAddress();
@@ -183,6 +187,10 @@ public class PinotConfigUtils {
     properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkAddress);
     properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, serverHost);
     properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, serverPort);
+    properties.put(QueryConfig.KEY_OF_QUERY_SERVER_PORT, serverMultiStageServerPort != 0
+        ? serverMultiStageServerPort : getAvailablePort());
+    properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, serverMultiStageRunnerPort != 0
+        ? serverMultiStageRunnerPort : getAvailablePort());
     properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort);
     properties.put(CommonConstants.Server.CONFIG_OF_GRPC_PORT, serverGrpcPort);
     properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
diff --git a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
index 168c698105..09c86b127b 100644
--- a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
@@ -4,7 +4,7 @@
   "segmentsConfig": {
     "segmentPushType": "APPEND",
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "schemaName": "baseball",
+    "schemaName": "baseballStats",
     "replication": "1"
   },
   "tenants": {
diff --git a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
index eadbf8641c..b7a079870d 100644
--- a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
@@ -4,7 +4,7 @@
   "segmentsConfig": {
     "segmentPushType": "APPEND",
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "schemaName": "baseball",
+    "schemaName": "baseballStats",
     "replication": "1"
   },
   "tenants": {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org