You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/07/02 02:59:14 UTC
[pinot] branch master updated: adding MultiStageEngineQuickStart (#8980)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eda488f5d7 adding MultiStageEngineQuickStart (#8980)
eda488f5d7 is described below
commit eda488f5d7aeb68255c3452d00daacffbbbdd477
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Jul 1 19:59:07 2022 -0700
adding MultiStageEngineQuickStart (#8980)
* adding MultiStageEngineQuickStart
* address diff comments
* add more queries
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../tests/MultiStageEngineIntegrationTest.java | 3 +-
.../query/runtime/utils/ServerRequestUtils.java | 2 +
.../apache/pinot/query/service/QueryConfig.java | 4 +-
.../server/starter/helix/BaseServerStarter.java | 5 +-
.../pinot/tools/MultistageEngineQuickStart.java | 129 +++++++++++++++++++++
.../tools/admin/command/PostQueryCommand.java | 18 ++-
.../tools/admin/command/QuickstartRunner.java | 15 ++-
.../tools/admin/command/StartBrokerCommand.java | 17 ++-
.../tools/admin/command/StartServerCommand.java | 39 ++++++-
.../admin/command/StartServiceManagerCommand.java | 7 +-
.../apache/pinot/tools/utils/PinotConfigUtils.java | 12 +-
.../baseballStats_offline_table_config.json | 2 +-
.../baseballStats_offline_table_config.json | 2 +-
13 files changed, 236 insertions(+), 19 deletions(-)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 0d78d2b182..e5c92c1799 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -115,7 +115,8 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
return new Object[][] {
new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1},
new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73},
- new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2},
+ new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE"
+ + " WHERE CarrierDelay=15 AND ArrDelay>20", 172, 2},
new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin "
+ " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146}
};
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 15a8b0194c..738992fe52 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
* conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor.
*/
public class ServerRequestUtils {
+ private static final int DEFAULT_LEAF_NODE_LIMIT = 1_000_000;
private ServerRequestUtils() {
// do not instantiate.
@@ -84,6 +85,7 @@ public class ServerRequestUtils {
public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan) {
PinotQuery pinotQuery = new PinotQuery();
+ pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
pinotQuery.setExplain(false);
walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery);
return pinotQuery;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index 8de432f3ea..c0bbb08174 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -23,13 +23,13 @@ package org.apache.pinot.query.service;
*/
public class QueryConfig {
public static final String KEY_OF_QUERY_SERVER_PORT = "pinot.query.server.port";
- public static final int DEFAULT_QUERY_SERVER_PORT = -1;
+ public static final int DEFAULT_QUERY_SERVER_PORT = 0;
public static final String KEY_OF_QUERY_RUNNER_HOSTNAME = "pinot.query.runner.hostname";
public static final String DEFAULT_QUERY_RUNNER_HOSTNAME = "localhost";
// query runner port is the mailbox port.
public static final String KEY_OF_QUERY_RUNNER_PORT = "pinot.query.runner.port";
- public static final int DEFAULT_QUERY_RUNNER_PORT = -1;
+ public static final int DEFAULT_QUERY_RUNNER_PORT = 0;
private QueryConfig() {
// do not instantiate.
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 8f00c56bfb..c7dafd8a4e 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -183,8 +183,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
int dataTableVersion =
_serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) {
- throw new UnsupportedOperationException("Setting experimental DataTable version newer than default via config "
- + "is not allowed. Current default DataTable version: " + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+ LOGGER.warn("Setting experimental DataTable version newer than default via config could result in"
+ + " backward-compatibility issues. Current default DataTable version: "
+ + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
}
DataTableFactory.setDataTableVersion(dataTableVersion);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
new file mode 100644
index 0000000000..c7179c6856
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+
+
+public class MultistageEngineQuickStart extends QuickStartBase {
+
+ @Override
+ public List<String> types() {
+ return Collections.singletonList("MULTI_STAGE");
+ }
+
+ @Override
+ public Map<String, Object> getConfigOverrides() {
+ Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
+ overrides.put("pinot.multistage.engine.enabled", "true");
+ overrides.put("pinot.server.instance.currentDataTableVersion", 4);
+ return overrides;
+ }
+
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis()));
+
+ // Baseball stat table
+ File baseBallStatsBaseDir = new File(quickstartTmpDir, "baseballStats");
+ File schemaFile = new File(baseBallStatsBaseDir, "baseballStats_schema.json");
+ File tableConfigFile = new File(baseBallStatsBaseDir, "baseballStats_offline_table_config.json");
+ File ingestionJobSpecFile = new File(baseBallStatsBaseDir, "ingestionJobSpec.yaml");
+ ClassLoader classLoader = Quickstart.class.getClassLoader();
+ URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, schemaFile);
+ resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, tableConfigFile);
+ resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+ QuickstartTableRequest request = new QuickstartTableRequest(baseBallStatsBaseDir.getAbsolutePath());
+
+ File tempDir = new File(quickstartTmpDir, "tmp");
+ FileUtils.forceMkdir(tempDir);
+ QuickstartRunner runner =
+ new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, tempDir, getConfigOverrides());
+
+ printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Quickstart.Color.GREEN, "***** Shutting down offline quick start *****");
+ runner.stop();
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ printStatus(Quickstart.Color.CYAN, "***** Bootstrap baseballStats table *****");
+ runner.bootstrapTable();
+
+ waitForBootstrapToComplete(null);
+
+ Map<String, String> queryOptions = Collections.singletonMap(
+ CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE, "true");
+
+ printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart setup complete *****");
+ String q1 = "SELECT count(*) FROM baseballStats_OFFLINE limit 1";
+ printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
+ printStatus(Quickstart.Color.CYAN, "Query : " + q1);
+ printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q1, queryOptions)));
+ printStatus(Quickstart.Color.GREEN, "***************************************************");
+
+ String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID"
+ + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON a.playerID = b.playerID"
+ + " WHERE a.runs > 160 AND b.runs < 2";
+ printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with more than 160-run some year(s) and"
+ + " with less than 2-run some other year(s)");
+ printStatus(Quickstart.Color.CYAN, "Query : " + q2);
+ printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q2, queryOptions)));
+ printStatus(Quickstart.Color.GREEN, "***************************************************");
+
+ printStatus(Quickstart.Color.GREEN, "***************************************************");
+ printStatus(Quickstart.Color.YELLOW, "Example query run completed.");
+ printStatus(Quickstart.Color.GREEN, "***************************************************");
+ printStatus(Quickstart.Color.YELLOW, "Please use broker port for executing multistage queries.");
+ printStatus(Quickstart.Color.GREEN, "***************************************************");
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type", "MULTI_STAGE"));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
index f004b5f367..aad2e05430 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java
@@ -18,7 +18,8 @@
*/
package org.apache.pinot.tools.admin.command;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
@@ -62,6 +63,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
+ "this message.")
private boolean _help = false;
+ @CommandLine.Option(names = {"-o", "-option"}, required = false, description = "Additional options '-o key=value'")
+ private Map<String, String> _additionalOptions = new HashMap<>();
+
private AuthProvider _authProvider;
@Override
@@ -124,6 +128,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
return this;
}
+ public PostQueryCommand setAdditionalOptions(Map<String, String> additionalOptions) {
+ _additionalOptions.putAll(additionalOptions);
+ return this;
+ }
+
public String run()
throws Exception {
if (_brokerHost == null) {
@@ -131,7 +140,12 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman
}
LOGGER.info("Executing command: " + this);
String url = _brokerProtocol + "://" + _brokerHost + ":" + _brokerPort + "/query/sql";
- String request = JsonUtils.objectToString(Collections.singletonMap(Request.SQL, _query));
+ Map<String, String> payload = new HashMap<>();
+ payload.put(Request.SQL, _query);
+ if (_additionalOptions != null) {
+ payload.putAll(_additionalOptions);
+ }
+ String request = JsonUtils.objectToString(payload);
return sendRequest("POST", url, request, makeAuthHeaders(makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user, _password)));
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index c656278d85..c1f7cf4b38 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -53,6 +54,10 @@ public class QuickstartRunner {
private static final int DEFAULT_SERVER_GRPC_PORT = 7100;
private static final int DEFAULT_MINION_PORT = 6000;
+ private static final int DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT = 8421;
+ private static final int DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT = 8442;
+ private static final int DEFAULT_SERVER_MULTISTAGE_SERVER_PORT = 8842;
+
private static final String DEFAULT_ZK_DIR = "PinotZkDir";
private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir";
private static final String DEFAULT_SERVER_DATA_DIR = "PinotServerDataDir";
@@ -132,6 +137,7 @@ public class QuickstartRunner {
for (int i = 0; i < _numBrokers; i++) {
StartBrokerCommand brokerStarter = new StartBrokerCommand();
brokerStarter.setPort(DEFAULT_BROKER_PORT + i)
+ .setBrokerMultiStageRunnerPort(DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT + i)
.setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
.setConfigOverrides(_configOverrides);
if (!brokerStarter.execute()) {
@@ -147,6 +153,8 @@ public class QuickstartRunner {
StartServerCommand serverStarter = new StartServerCommand();
serverStarter.setPort(DEFAULT_SERVER_NETTY_PORT + i).setAdminPort(DEFAULT_SERVER_ADMIN_API_PORT + i)
.setGrpcPort(DEFAULT_SERVER_GRPC_PORT + i)
+ .setMultiStageServerPort(DEFAULT_SERVER_MULTISTAGE_SERVER_PORT + i)
+ .setMultiStageRunnerPort(DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT + i)
.setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
.setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath())
.setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())
@@ -230,10 +238,15 @@ public class QuickstartRunner {
public JsonNode runQuery(String query)
throws Exception {
+ return runQuery(query, Collections.emptyMap());
+ }
+
+ public JsonNode runQuery(String query, Map<String, String> additionalOptions)
+ throws Exception {
int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size()));
return JsonUtils.stringToJsonNode(
new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthProvider(_authProvider)
- .setQuery(query).run());
+ .setAdditionalOptions(additionalOptions).setQuery(query).run());
}
public static void registerDefaultPinotFS() {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index dffd8e3bdf..e8e5ae7f68 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.Command;
@@ -51,6 +52,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
@CommandLine.Option(names = {"-brokerPort"}, required = false, description = "Broker port number to use for query.")
private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
+ @CommandLine.Option(names = {"-brokerMultiStageRunnerPort"}, required = false,
+ description = "Broker port number to use for query.")
+ private int _brokerMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT;
+
@CommandLine.Option(names = {"-zkAddress"}, required = false, description = "HTTP address of Zookeeper.")
private String _zkAddress = DEFAULT_ZK_ADDRESS;
@@ -76,6 +81,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
return _brokerPort;
}
+ public int getBrokerMultiStageRunnerPort() {
+ return _brokerMultiStageRunnerPort;
+ }
+
public String getZkAddress() {
return _zkAddress;
}
@@ -125,6 +134,11 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
return this;
}
+ public StartBrokerCommand setBrokerMultiStageRunnerPort(int brokerMultiStageRunnerPort) {
+ _brokerMultiStageRunnerPort = brokerMultiStageRunnerPort;
+ return this;
+ }
+
public StartBrokerCommand setZkAddress(String zkAddress) {
_zkAddress = zkAddress;
return this;
@@ -171,7 +185,8 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
_zkAddress = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, _zkAddress);
_clusterName = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, _clusterName);
} else {
- properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort));
+ properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort,
+ _brokerMultiStageRunnerPort));
}
if (_configOverrides != null) {
properties.putAll(_configOverrides);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
index fc85408646..80b2cd70bd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.Command;
@@ -59,6 +60,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
description = "Port number to serve the grpc query.")
private int _serverGrpcPort = CommonConstants.Server.DEFAULT_GRPC_PORT;
+ @CommandLine.Option(names = {"-serverMultiStageServerPort"}, required = false,
+ description = "Port number to multi-stage query engine service entrypoint.")
+ private int _serverMultiStageServerPort = QueryConfig.DEFAULT_QUERY_SERVER_PORT;
+
+ @CommandLine.Option(names = {"-serverMultiStageRunnerPort"}, required = false,
+ description = "Port number to multi-stage query engine runner communication.")
+ private int _serverMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT;
+
@CommandLine.Option(names = {"-dataDir"}, required = false, description = "Path to directory containing data.")
private String _dataDir = PinotConfigUtils.TMP_DIR + "data/pinotServerData";
@@ -100,6 +109,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
return _serverGrpcPort;
}
+ public int getServerMultiStageServerPort() {
+ return _serverMultiStageServerPort;
+ }
+
+ public int getServerMultiStageRunnerPort() {
+ return _serverMultiStageRunnerPort;
+ }
+
public String getDataDir() {
return _dataDir;
}
@@ -149,6 +166,16 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
return this;
}
+ public StartServerCommand setMultiStageServerPort(int multiStageServerPort) {
+ _serverMultiStageServerPort = multiStageServerPort;
+ return this;
+ }
+
+ public StartServerCommand setMultiStageRunnerPort(int multiStageRunnerPort) {
+ _serverMultiStageRunnerPort = multiStageRunnerPort;
+ return this;
+ }
+
public StartServerCommand setDataDir(String dataDir) {
_dataDir = dataDir;
return this;
@@ -173,11 +200,15 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
public String toString() {
if (_configFileName != null) {
return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort
- + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -configFileName "
- + _configFileName + " -zkAddress " + _zkAddress);
+ + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort
+ + " -serverMultistageServerPort " + _serverMultiStageServerPort
+ + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -configFileName " + _configFileName
+ + " -zkAddress " + _zkAddress);
} else {
return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort
- + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -dataDir " + _dataDir
+ + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort
+ + " -serverMultistageServerPort " + _serverMultiStageServerPort
+ + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -dataDir " + _dataDir
+ " -segmentDir " + _segmentDir + " -zkAddress " + _zkAddress);
}
}
@@ -229,7 +260,7 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm
} else {
properties.putAll(PinotConfigUtils
.generateServerConf(_clusterName, _zkAddress, _serverHost, _serverPort, _serverAdminPort, _serverGrpcPort,
- _dataDir, _segmentDir));
+ _serverMultiStageServerPort, _serverMultiStageRunnerPort, _dataDir, _segmentDir));
}
if (_configOverrides != null) {
properties.putAll(_configOverrides);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
index 1a5765faa8..b3274077ac 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.Command;
@@ -218,11 +219,13 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme
ControllerConf.ControllerMode.DUAL, true);
case BROKER:
return PinotConfigUtils
- .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT);
+ .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT,
+ QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
case SERVER:
return PinotConfigUtils
.generateServerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT,
- CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT, null, null);
+ CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT,
+ QueryConfig.DEFAULT_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT, null, null);
default:
throw new RuntimeException("No default config found for service role: " + serviceRole);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 046b2ca4e4..98b59dc1e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf;
+import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -149,7 +150,7 @@ public class PinotConfigUtils {
}
public static Map<String, Object> generateBrokerConf(String clusterName, String zkAddress, String brokerHost,
- int brokerPort)
+ int brokerPort, int brokerMultiStageRunnerPort)
throws SocketException, UnknownHostException {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, clusterName);
@@ -157,11 +158,14 @@ public class PinotConfigUtils {
properties.put(CommonConstants.Broker.CONFIG_OF_BROKER_HOSTNAME,
!StringUtils.isEmpty(brokerHost) ? brokerHost : NetUtils.getHostAddress());
properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, brokerPort != 0 ? brokerPort : getAvailablePort());
+ properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, brokerMultiStageRunnerPort != 0
+ ? brokerMultiStageRunnerPort : getAvailablePort());
return properties;
}
public static Map<String, Object> generateServerConf(String clusterName, String zkAddress, String serverHost,
- int serverPort, int serverAdminPort, int serverGrpcPort, String serverDataDir, String serverSegmentDir)
+ int serverPort, int serverAdminPort, int serverGrpcPort, int serverMultiStageServerPort,
+ int serverMultiStageRunnerPort, String serverDataDir, String serverSegmentDir)
throws SocketException, UnknownHostException {
if (serverHost == null) {
serverHost = NetUtils.getHostAddress();
@@ -183,6 +187,10 @@ public class PinotConfigUtils {
properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkAddress);
properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, serverHost);
properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, serverPort);
+ properties.put(QueryConfig.KEY_OF_QUERY_SERVER_PORT, serverMultiStageServerPort != 0
+ ? serverMultiStageServerPort : getAvailablePort());
+ properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, serverMultiStageRunnerPort != 0
+ ? serverMultiStageRunnerPort : getAvailablePort());
properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort);
properties.put(CommonConstants.Server.CONFIG_OF_GRPC_PORT, serverGrpcPort);
properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir);
diff --git a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
index 168c698105..09c86b127b 100644
--- a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
@@ -4,7 +4,7 @@
"segmentsConfig": {
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "schemaName": "baseball",
+ "schemaName": "baseballStats",
"replication": "1"
},
"tenants": {
diff --git a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
index eadbf8641c..b7a079870d 100644
--- a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json
@@ -4,7 +4,7 @@
"segmentsConfig": {
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "schemaName": "baseball",
+ "schemaName": "baseballStats",
"replication": "1"
},
"tenants": {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org