You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/11 21:17:30 UTC
[pinot] branch master updated: Cleanup and simplify the integration test instance start (#8507)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 68c89b74f5 Cleanup and simplify the integration test instance start (#8507)
68c89b74f5 is described below
commit 68c89b74f5111a0c53d632f932f080b5e394ff44
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Apr 11 14:17:24 2022 -0700
Cleanup and simplify the integration test instance start (#8507)
---
.../broker/broker/helix/BaseBrokerStarter.java | 5 +
.../pinot/integration/tests/ClusterTest.java | 286 +++++++--------------
.../tests/AdminConsoleIntegrationTest.java | 3 -
.../tests/GrpcBrokerClusterIntegrationTest.java | 25 +-
.../MultiNodesOfflineClusterIntegrationTest.java | 53 +++-
.../tests/OfflineClusterIntegrationTest.java | 19 +-
.../tests/OfflineGRPCServerIntegrationTest.java | 76 +-----
.../OfflineSecureGRPCServerIntegrationTest.java | 25 +-
...rDownloadLLCRealtimeClusterIntegrationTest.java | 3 +-
.../UpsertTableSegmentUploadIntegrationTest.java | 2 +-
10 files changed, 188 insertions(+), 309 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 9b467e77d4..852fc5ae41 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -148,6 +148,10 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
_brokerConf.getProperty(Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS, Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
+ public int getPort() {
+ return _port;
+ }
+
/**
* Adds an ideal state change handler to handle Helix ideal state change callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
@@ -261,6 +265,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null);
}
}
+ _brokerRequestHandler.start();
LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_brokerAdminApplication =
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index ed28045f04..ea15a66ce1 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -81,7 +81,6 @@ import static org.testng.Assert.assertTrue;
* Base class for integration tests that involve a complete Pinot cluster.
*/
public abstract class ClusterTest extends ControllerTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTest.class);
protected static final int DEFAULT_BROKER_PORT = 18099;
protected static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -96,75 +95,64 @@ public abstract class ClusterTest extends ControllerTest {
return new PinotConfiguration();
}
- protected void startBroker()
- throws Exception {
- startBrokers(1);
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ // Do nothing, to be overridden by tests if they need something specific
}
- protected void startBroker(int port, String zkStr)
+ protected void startBroker()
throws Exception {
- startBrokers(1, port, zkStr, Collections.emptyMap());
+ startBrokers(1);
}
protected void startBrokers(int numBrokers)
throws Exception {
- startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl(), Collections.emptyMap());
- }
-
- protected void startBrokers(int numBrokers, int basePort, String zkStr)
- throws Exception {
- startBrokers(numBrokers, basePort, zkStr, Collections.emptyMap());
- }
-
- protected void startBrokers(int numBrokers, int basePort, String zkStr, Map<String, Object> extraProperties)
- throws Exception {
_brokerStarters = new ArrayList<>(numBrokers);
_brokerPorts = new ArrayList<>();
for (int i = 0; i < numBrokers; i++) {
- Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
- properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
- properties.put(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
- int port = NetUtils.findOpenPort(basePort + i);
- _brokerPorts.add(port);
- properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, port);
- properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
- properties.putAll(extraProperties);
- PinotConfiguration configuration = new PinotConfiguration(properties);
- overrideBrokerConf(configuration);
-
- HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
- brokerStarter.init(configuration);
- brokerStarter.start();
+ HelixBrokerStarter brokerStarter = startOneBroker(i);
_brokerStarters.add(brokerStarter);
+ _brokerPorts.add(brokerStarter.getPort());
}
_brokerBaseApiUrl = "http://localhost:" + _brokerPorts.get(0);
}
+ protected HelixBrokerStarter startOneBroker(int brokerId)
+ throws Exception {
+ PinotConfiguration brokerConf = getDefaultBrokerConfiguration();
+ brokerConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+ brokerConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+ brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, NetUtils.findOpenPort(DEFAULT_BROKER_PORT + brokerId));
+ brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+ overrideBrokerConf(brokerConf);
+
+ HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
+ brokerStarter.init(brokerConf);
+ brokerStarter.start();
+ return brokerStarter;
+ }
+
protected void startBrokerHttps()
throws Exception {
_brokerStarters = new ArrayList<>();
_brokerPorts = new ArrayList<>();
- Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
- properties.put(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
- properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-
- properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
- properties.put(Broker.CONFIG_OF_BROKER_HOSTNAME, LOCAL_HOST);
-
- PinotConfiguration configuration = new PinotConfiguration(properties);
- overrideBrokerConf(configuration);
+ PinotConfiguration brokerConf = getDefaultBrokerConfiguration();
+ brokerConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+ brokerConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+ brokerConf.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, LOCAL_HOST);
+ brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+ overrideBrokerConf(brokerConf);
HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
- brokerStarter.init(configuration);
+ brokerStarter.init(brokerConf);
brokerStarter.start();
_brokerStarters.add(brokerStarter);
// TLS configs require hard-coding
_brokerPorts.add(DEFAULT_BROKER_PORT);
- _brokerBaseApiUrl = "https://localhost:" + _brokerPorts.get(0);
+ _brokerBaseApiUrl = "https://localhost:" + DEFAULT_BROKER_PORT;
}
protected int getRandomBrokerPort() {
@@ -189,85 +177,64 @@ public abstract class ClusterTest extends ControllerTest {
return configuration;
}
- protected void startServer() {
- startServers(1);
- }
-
- protected void startServer(PinotConfiguration configuration) {
- startServers(1, configuration);
- }
-
- protected void startServers(int numServers) {
- startServers(numServers, getDefaultServerConfiguration());
- }
-
- protected void startServers(int numServers, PinotConfiguration configuration) {
- startServers(numServers, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
- }
-
- protected void startServers(int numServers, int baseAdminApiPort, int baseNettyPort, String zkStr) {
- startServers(numServers, getDefaultServerConfiguration(), baseAdminApiPort, baseNettyPort, zkStr);
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ // Do nothing, to be overridden by tests if they need something specific
}
- protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
- String zkStr) {
- startServers(numServers, configuration, baseAdminApiPort, baseNettyPort, Server.DEFAULT_GRPC_PORT, zkStr);
+ protected void startServer()
+ throws Exception {
+ startServers(1);
}
- protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
- int baseGrpcPort, String zkStr) {
+ protected void startServers(int numServers)
+ throws Exception {
FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
_serverStarters = new ArrayList<>(numServers);
- overrideServerConf(configuration);
- try {
- for (int i = 0; i < numServers; i++) {
- configuration.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- configuration.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
- Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
- configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
- configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
- if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
- configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
- }
- configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
- // Thread time measurement is disabled by default, enable it in integration tests.
- // TODO: this can be removed when we eventually enable thread time measurement by default.
- configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
- HelixServerStarter helixServerStarter = new HelixServerStarter();
- helixServerStarter.init(configuration);
- helixServerStarter.start();
- _serverStarters.add(helixServerStarter);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ for (int i = 0; i < numServers; i++) {
+ _serverStarters.add(startOneServer(i));
}
}
- protected List<HelixServerStarter> getServerStarters() {
- return _serverStarters;
- }
-
- protected void startServerHttps() {
+ protected HelixServerStarter startOneServer(int serverId)
+ throws Exception {
+ PinotConfiguration serverConf = getDefaultServerConfiguration();
+ serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+ serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ serverConf.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + serverId);
+ serverConf.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
+ Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + serverId);
+ serverConf.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, Server.DEFAULT_ADMIN_API_PORT - serverId);
+ serverConf.setProperty(Server.CONFIG_OF_NETTY_PORT, Helix.DEFAULT_SERVER_NETTY_PORT + serverId);
+ serverConf.setProperty(Server.CONFIG_OF_GRPC_PORT, Server.DEFAULT_GRPC_PORT + serverId);
+ // Thread time measurement is disabled by default, enable it in integration tests.
+ // TODO: this can be removed when we eventually enable thread time measurement by default.
+ serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
+ overrideServerConf(serverConf);
+
+ HelixServerStarter serverStarter = new HelixServerStarter();
+ serverStarter.init(serverConf);
+ serverStarter.start();
+ return serverStarter;
+ }
+
+ protected void startServerHttps()
+ throws Exception {
FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
_serverStarters = new ArrayList<>();
- Map<String, Object> properties = getDefaultServerConfiguration().toMap();
- properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ PinotConfiguration serverConf = getDefaultServerConfiguration();
+ serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+ serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ overrideServerConf(serverConf);
- PinotConfiguration configuration = new PinotConfiguration(properties);
- overrideServerConf(configuration);
+ HelixServerStarter serverStarter = new HelixServerStarter();
+ serverStarter.init(serverConf);
+ serverStarter.start();
+ _serverStarters.add(serverStarter);
+ }
- try {
- HelixServerStarter helixServerStarter = new HelixServerStarter();
- helixServerStarter.init(configuration);
- _serverStarters.add(helixServerStarter);
- helixServerStarter.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ protected List<HelixServerStarter> getServerStarters() {
+ return _serverStarters;
}
protected PinotConfiguration getDefaultMinionConfiguration() {
@@ -276,107 +243,54 @@ public abstract class ClusterTest extends ControllerTest {
// NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
// to manage the instance level configs
- protected void startMinion() {
+ protected void startMinion()
+ throws Exception {
FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
- try {
- PinotConfiguration minionConf = getDefaultMinionConfiguration();
- minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
- _minionStarter = new MinionStarter();
- _minionStarter.init(minionConf);
- _minionStarter.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected void overrideServerConf(PinotConfiguration configuration) {
- // Do nothing, to be overridden by tests if they need something specific
- }
-
- protected void overrideBrokerConf(PinotConfiguration configuration) {
- // Do nothing, to be overridden by tests if they need something specific
+ PinotConfiguration minionConf = getDefaultMinionConfiguration();
+ minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+ minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ _minionStarter = new MinionStarter();
+ _minionStarter.init(minionConf);
+ _minionStarter.start();
}
protected void stopBroker() {
assertNotNull(_brokerStarters, "Brokers are not started");
for (HelixBrokerStarter brokerStarter : _brokerStarters) {
- try {
- brokerStarter.stop();
- } catch (Exception e) {
- LOGGER.error("Encountered exception while stopping broker {}", e.getMessage());
- }
+ brokerStarter.stop();
}
_brokerStarters = null;
}
protected void stopServer() {
assertNotNull(_serverStarters, "Servers are not started");
- for (HelixServerStarter helixServerStarter : _serverStarters) {
- try {
- helixServerStarter.stop();
- } catch (Exception e) {
- LOGGER.error("Encountered exception while stopping server {}", e.getMessage());
- }
+ for (HelixServerStarter serverStarter : _serverStarters) {
+ serverStarter.stop();
}
FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
_serverStarters = null;
}
- protected void restartServers(int numServers) {
- assertNotNull(_serverStarters, "Servers are not started");
- for (HelixServerStarter helixServerStarter : _serverStarters) {
- try {
- helixServerStarter.stop();
- } catch (Exception e) {
- LOGGER.error("Encountered exception while stopping server {}", e.getMessage());
- }
- }
-
- _serverStarters = new ArrayList<>(numServers);
- String zkStr = getZkUrl();
- int baseAdminApiPort = Server.DEFAULT_ADMIN_API_PORT;
- int baseNettyPort = Helix.DEFAULT_SERVER_NETTY_PORT;
- int baseGrpcPort = Server.DEFAULT_GRPC_PORT;
- PinotConfiguration configuration = getDefaultServerConfiguration();
- overrideServerConf(configuration);
- try {
- for (int i = 0; i < numServers; i++) {
- configuration.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
- configuration.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
- Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
- configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
- configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
- if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
- configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
- }
- configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
- // Thread time measurement is disabled by default, enable it in integration tests.
- // TODO: this can be removed when we eventually enable thread time measurement by default.
- configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
- HelixServerStarter helixServerStarter = new HelixServerStarter();
- helixServerStarter.init(configuration);
- helixServerStarter.start();
- _serverStarters.add(helixServerStarter);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
protected void stopMinion() {
assertNotNull(_minionStarter, "Minion is not started");
- try {
- _minionStarter.stop();
- } catch (Exception e) {
- LOGGER.error("Encountered exception while stopping minion {}", e.getMessage());
- }
+ _minionStarter.stop();
FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
_minionStarter = null;
}
+ protected void restartServers()
+ throws Exception {
+ assertNotNull(_serverStarters, "Servers are not started");
+ for (HelixServerStarter serverStarter : _serverStarters) {
+ serverStarter.stop();
+ }
+ int numServers = _serverStarters.size();
+ _serverStarters.clear();
+ for (int i = 0; i < numServers; i++) {
+ _serverStarters.add(startOneServer(i));
+ }
+ }
+
/**
* Upload all segments inside the given directory to the cluster.
*
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
index 7be06f382e..f0a838b0a9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
@@ -24,8 +24,6 @@ import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.controller.api.ControllerAdminApiApplication;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -36,7 +34,6 @@ import org.testng.annotations.Test;
* Tests that the controller, broker and server admin consoles return the expected pages.
*/
public class AdminConsoleIntegrationTest extends BaseClusterIntegrationTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(AdminConsoleIntegrationTest.class);
@BeforeClass
public void setUp()
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
index 3ab54fce7b..4f7677623c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
@@ -19,14 +19,14 @@
package org.apache.pinot.integration.tests;
import java.io.File;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
/**
* Integration test that converts Avro data for 12 segments and runs queries against it.
*/
-public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest {
private static final String TENANT_NAME = "TestTenant";
private static final int NUM_OFFLINE_SEGMENTS = 8;
private static final int NUM_REALTIME_SEGMENTS = 6;
@@ -51,7 +51,13 @@ public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest
}
@Override
- protected void overrideServerConf(PinotConfiguration configuration) {
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ brokerConf.setProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc");
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
}
@BeforeClass
@@ -102,16 +108,9 @@ public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest
// Start the Pinot cluster
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
-
startController(properties);
-
- startBrokers(1, DEFAULT_BROKER_PORT, getZkUrl(),
- Collections.singletonMap(CommonConstants.Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc"));
-
- // Enable gRPC server
- PinotConfiguration serverConfig = getDefaultServerConfiguration();
- serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
- startServers(2, serverConfig);
+ startBrokers(1);
+ startServers(2);
// Create tenants
createBrokerTenant(TENANT_NAME, 1);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index fb59a384b6..e5757a879c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -23,10 +23,8 @@ import java.util.Map;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
-import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.Test;
@@ -53,26 +51,18 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg
}
@Override
- protected void startServers() {
- startServers(NUM_SERVERS);
+ protected int getNumReplicas() {
+ return NUM_SERVERS;
}
@Test
public void testUpdateBrokerResource()
throws Exception {
// Add a new broker to the cluster
- Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
- String clusterName = getHelixClusterName();
- properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, clusterName);
- properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
- int port = NetUtils.findOpenPort(DEFAULT_BROKER_PORT);
- properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
- properties.put(CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
- HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
- brokerStarter.init(new PinotConfiguration(properties));
- brokerStarter.start();
+ HelixBrokerStarter brokerStarter = startOneBroker(NUM_BROKERS);
// Check if broker is added to all the tables in broker resource
+ String clusterName = getHelixClusterName();
String brokerId = brokerStarter.getInstanceId();
IdealState brokerResourceIdealState =
_helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
@@ -128,6 +118,41 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg
assertFalse(_helixAdmin.getInstancesInCluster(clusterName).contains(brokerId));
}
+ // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testStarTreeTriggering() {
+ // Ignored
+ }
+
+ // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testDefaultColumns() {
+ // Ignored
+ }
+
+ // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testBloomFilterTriggering() {
+ // Ignored
+ }
+
+ // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testRangeIndexTriggering() {
+ // Ignored
+ }
+
+ // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testInvertedIndexTriggering() {
+ // Ignored
+ }
+
@Test(enabled = false)
@Override
public void testHardcodedServerPartitionedSqlQueries() {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b334f1476e..b1b52d89c6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -60,7 +60,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
@@ -166,7 +165,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Start the Pinot cluster
startZk();
startController();
- startBrokers(getNumBrokers());
+ startBrokers();
startServers();
// Create and upload the schema and table config
@@ -217,10 +216,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
waitForAllDocsLoaded(600_000L);
}
- protected void startServers() {
- // Enable gRPC server
- PinotConfiguration serverConfig = getDefaultServerConfiguration();
- startServer(serverConfig);
+ protected void startBrokers()
+ throws Exception {
+ startBrokers(getNumBrokers());
+ }
+
+ protected void startServers()
+ throws Exception {
+ startServers(getNumServers());
}
private void registerCallbackHandlers() {
@@ -428,8 +431,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
} while (System.currentTimeMillis() < endTimeMs);
- throw new TimeoutException(String
- .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
+ throw new TimeoutException(
+ String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
}
@Test(dependsOnMethods = "testRangeIndexTriggering")
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index ee8ce590a2..cf9f9d2a9c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -18,18 +18,15 @@
*/
package org.apache.pinot.integration.tests;
-import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.helix.model.IdealState;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -38,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -47,30 +43,11 @@ import org.testng.annotations.Test;
import static org.testng.Assert.*;
-public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTestSet {
- private static final int NUM_BROKERS = 1;
- private static final int NUM_SERVERS = 1;
- private static final int NUM_SEGMENTS = 12;
-
- private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
- new ArrayList<>(getNumBrokers() + getNumServers());
- private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
- // Cache the table size after removing an index via reloading. Once this value
- // is set, assert that table size always gets back to this value after removing
- // any other kind of index.
- private long _tableSizeAfterRemovingIndex;
-
- protected int getNumBrokers() {
- return NUM_BROKERS;
- }
-
- protected int getNumServers() {
- return NUM_SERVERS;
- }
+public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest {
@Override
- protected String getSchemaFileName() {
- return _schemaFileName;
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
}
@BeforeClass
@@ -81,8 +58,8 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest
// Start the Pinot cluster
startZk();
startController();
- startBrokers(getNumBrokers());
- startServers();
+ startBroker();
+ startServer();
// Create and upload the schema and table config
Schema schema = createSchema();
@@ -103,53 +80,10 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest
// Initialize the query generator
setUpQueryGenerator(avroFiles);
- // Set up service status callbacks
- // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
- // resources to monitor
- registerCallbackHandlers();
-
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
- void setExtraServerConfigs(PinotConfiguration serverConfig) {
- }
-
- protected void startServers() {
- // Enable gRPC server
- PinotConfiguration serverConfig = getDefaultServerConfiguration();
- serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
- setExtraServerConfigs(serverConfig);
- startServer(serverConfig);
- }
-
- private void registerCallbackHandlers() {
- List<String> instances = _helixAdmin.getInstancesInCluster(getHelixClusterName());
- instances.removeIf(
- instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance.startsWith(
- CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
- List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName());
- resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource)
- && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource)));
- for (String instance : instances) {
- List<String> resourcesToMonitor = new ArrayList<>();
- for (String resourceName : resourcesInCluster) {
- IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName);
- for (String partitionName : idealState.getPartitionSet()) {
- if (idealState.getInstanceSet(partitionName).contains(instance)) {
- resourcesToMonitor.add(resourceName);
- break;
- }
- }
- }
- _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(
- new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
- instance, resourcesToMonitor, 100.0),
- new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
- instance, resourcesToMonitor, 100.0))));
- }
- }
-
public GrpcQueryClient getGrpcQueryClient() {
return new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
index 1183b79202..7826c692ea 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerIntegrationTest {
@@ -33,16 +33,17 @@ public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerInt
private final URL _tlsStoreJKS = OfflineSecureGRPCServerIntegrationTest.class.getResource("/tlstest.jks");
@Override
- protected void setExtraServerConfigs(PinotConfiguration serverConfig) {
- serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, true);
- serverConfig.setProperty("pinot.server.grpctls.client.auth.enabled", true);
- serverConfig.setProperty("pinot.server.grpctls.keystore.type", JKS);
- serverConfig.setProperty("pinot.server.grpctls.keystore.path", _tlsStoreJKS);
- serverConfig.setProperty("pinot.server.grpctls.keystore.password", PASSWORD);
- serverConfig.setProperty("pinot.server.grpctls.truststore.type", JKS);
- serverConfig.setProperty("pinot.server.grpctls.truststore.path", _tlsStoreJKS);
- serverConfig.setProperty("pinot.server.grpctls.truststore.password", PASSWORD);
- serverConfig.setProperty("pinot.server.grpctls.ssl.provider", JDK);
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
+ serverConf.setProperty(Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, true);
+ serverConf.setProperty("pinot.server.grpctls.client.auth.enabled", true);
+ serverConf.setProperty("pinot.server.grpctls.keystore.type", JKS);
+ serverConf.setProperty("pinot.server.grpctls.keystore.path", _tlsStoreJKS);
+ serverConf.setProperty("pinot.server.grpctls.keystore.password", PASSWORD);
+ serverConf.setProperty("pinot.server.grpctls.truststore.type", JKS);
+ serverConf.setProperty("pinot.server.grpctls.truststore.path", _tlsStoreJKS);
+ serverConf.setProperty("pinot.server.grpctls.truststore.password", PASSWORD);
+ serverConf.setProperty("pinot.server.grpctls.ssl.provider", JDK);
}
@Override
@@ -57,6 +58,6 @@ public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerInt
configMap.put("tls.truststore.type", JKS);
configMap.put("tls.ssl.provider", JDK);
GrpcQueryClient.Config config = new GrpcQueryClient.Config(configMap);
- return new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT, config);
+ return new GrpcQueryClient("localhost", Server.DEFAULT_GRPC_PORT, config);
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
index 5067eb1821..77a5fd959c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -114,7 +114,8 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClust
}
@Override
- public void startServer() {
+ public void startServer()
+ throws Exception {
startServers(NUM_SERVERS);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 09d698d534..616f150d52 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -173,7 +173,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
verifyTableIdealStates(idealState);
// Restart the servers and check every segment is not in ERROR state.
- restartServers(NUM_SERVERS);
+ restartServers();
verifyTableIdealStates(idealState);
ExternalView ev =
HelixHelper.getExternalViewForResource(_helixAdmin, this.getHelixClusterName(), TABLE_NAME_WITH_TYPE);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org