You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/11 21:17:30 UTC

[pinot] branch master updated: Cleanup and simplify the integration test instance start (#8507)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 68c89b74f5 Cleanup and simplify the integration test instance start (#8507)
68c89b74f5 is described below

commit 68c89b74f5111a0c53d632f932f080b5e394ff44
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Apr 11 14:17:24 2022 -0700

    Cleanup and simplify the integration test instance start (#8507)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   5 +
 .../pinot/integration/tests/ClusterTest.java       | 286 +++++++--------------
 .../tests/AdminConsoleIntegrationTest.java         |   3 -
 .../tests/GrpcBrokerClusterIntegrationTest.java    |  25 +-
 .../MultiNodesOfflineClusterIntegrationTest.java   |  53 +++-
 .../tests/OfflineClusterIntegrationTest.java       |  19 +-
 .../tests/OfflineGRPCServerIntegrationTest.java    |  76 +-----
 .../OfflineSecureGRPCServerIntegrationTest.java    |  25 +-
 ...rDownloadLLCRealtimeClusterIntegrationTest.java |   3 +-
 .../UpsertTableSegmentUploadIntegrationTest.java   |   2 +-
 10 files changed, 188 insertions(+), 309 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 9b467e77d4..852fc5ae41 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -148,6 +148,10 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
         _brokerConf.getProperty(Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS, Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
 
+  public int getPort() {
+    return _port;
+  }
+
   /**
    * Adds an ideal state change handler to handle Helix ideal state change callbacks.
    * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
@@ -261,6 +265,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
                 queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null);
       }
     }
+    _brokerRequestHandler.start();
 
     LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _brokerAdminApplication =
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index ed28045f04..ea15a66ce1 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -81,7 +81,6 @@ import static org.testng.Assert.assertTrue;
  * Base class for integration tests that involve a complete Pinot cluster.
  */
 public abstract class ClusterTest extends ControllerTest {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTest.class);
   protected static final int DEFAULT_BROKER_PORT = 18099;
   protected static final Random RANDOM = new Random(System.currentTimeMillis());
 
@@ -96,75 +95,64 @@ public abstract class ClusterTest extends ControllerTest {
     return new PinotConfiguration();
   }
 
-  protected void startBroker()
-      throws Exception {
-    startBrokers(1);
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    // Do nothing, to be overridden by tests if they need something specific
   }
 
-  protected void startBroker(int port, String zkStr)
+  protected void startBroker()
       throws Exception {
-    startBrokers(1, port, zkStr, Collections.emptyMap());
+    startBrokers(1);
   }
 
   protected void startBrokers(int numBrokers)
       throws Exception {
-    startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl(), Collections.emptyMap());
-  }
-
-  protected void startBrokers(int numBrokers, int basePort, String zkStr)
-      throws Exception {
-    startBrokers(numBrokers, basePort, zkStr, Collections.emptyMap());
-  }
-
-  protected void startBrokers(int numBrokers, int basePort, String zkStr, Map<String, Object> extraProperties)
-      throws Exception {
     _brokerStarters = new ArrayList<>(numBrokers);
     _brokerPorts = new ArrayList<>();
     for (int i = 0; i < numBrokers; i++) {
-      Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
-      properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-      properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
-      properties.put(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
-      int port = NetUtils.findOpenPort(basePort + i);
-      _brokerPorts.add(port);
-      properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, port);
-      properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-      properties.putAll(extraProperties);
-      PinotConfiguration configuration = new PinotConfiguration(properties);
-      overrideBrokerConf(configuration);
-
-      HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
-      brokerStarter.init(configuration);
-      brokerStarter.start();
+      HelixBrokerStarter brokerStarter = startOneBroker(i);
       _brokerStarters.add(brokerStarter);
+      _brokerPorts.add(brokerStarter.getPort());
     }
     _brokerBaseApiUrl = "http://localhost:" + _brokerPorts.get(0);
   }
 
+  protected HelixBrokerStarter startOneBroker(int brokerId)
+      throws Exception {
+    PinotConfiguration brokerConf = getDefaultBrokerConfiguration();
+    brokerConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+    brokerConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+    brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, NetUtils.findOpenPort(DEFAULT_BROKER_PORT + brokerId));
+    brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+    overrideBrokerConf(brokerConf);
+
+    HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
+    brokerStarter.init(brokerConf);
+    brokerStarter.start();
+    return brokerStarter;
+  }
+
   protected void startBrokerHttps()
       throws Exception {
     _brokerStarters = new ArrayList<>();
     _brokerPorts = new ArrayList<>();
 
-    Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
-    properties.put(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
-    properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-
-    properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-    properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
-    properties.put(Broker.CONFIG_OF_BROKER_HOSTNAME, LOCAL_HOST);
-
-    PinotConfiguration configuration = new PinotConfiguration(properties);
-    overrideBrokerConf(configuration);
+    PinotConfiguration brokerConf = getDefaultBrokerConfiguration();
+    brokerConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+    brokerConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+    brokerConf.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, LOCAL_HOST);
+    brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+    overrideBrokerConf(brokerConf);
 
     HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
-    brokerStarter.init(configuration);
+    brokerStarter.init(brokerConf);
     brokerStarter.start();
     _brokerStarters.add(brokerStarter);
 
     // TLS configs require hard-coding
     _brokerPorts.add(DEFAULT_BROKER_PORT);
-    _brokerBaseApiUrl = "https://localhost:" + _brokerPorts.get(0);
+    _brokerBaseApiUrl = "https://localhost:" + DEFAULT_BROKER_PORT;
   }
 
   protected int getRandomBrokerPort() {
@@ -189,85 +177,64 @@ public abstract class ClusterTest extends ControllerTest {
     return configuration;
   }
 
-  protected void startServer() {
-    startServers(1);
-  }
-
-  protected void startServer(PinotConfiguration configuration) {
-    startServers(1, configuration);
-  }
-
-  protected void startServers(int numServers) {
-    startServers(numServers, getDefaultServerConfiguration());
-  }
-
-  protected void startServers(int numServers, PinotConfiguration configuration) {
-    startServers(numServers, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
-  }
-
-  protected void startServers(int numServers, int baseAdminApiPort, int baseNettyPort, String zkStr) {
-    startServers(numServers, getDefaultServerConfiguration(), baseAdminApiPort, baseNettyPort, zkStr);
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    // Do nothing, to be overridden by tests if they need something specific
   }
 
-  protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
-      String zkStr) {
-    startServers(numServers, configuration, baseAdminApiPort, baseNettyPort, Server.DEFAULT_GRPC_PORT, zkStr);
+  protected void startServer()
+      throws Exception {
+    startServers(1);
   }
 
-  protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
-      int baseGrpcPort, String zkStr) {
+  protected void startServers(int numServers)
+      throws Exception {
     FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
     _serverStarters = new ArrayList<>(numServers);
-    overrideServerConf(configuration);
-    try {
-      for (int i = 0; i < numServers; i++) {
-        configuration.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-        configuration.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
-            Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
-        configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
-        configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
-          configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
-        }
-        configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        // Thread time measurement is disabled by default, enable it in integration tests.
-        // TODO: this can be removed when we eventually enable thread time measurement by default.
-        configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
-        HelixServerStarter helixServerStarter = new HelixServerStarter();
-        helixServerStarter.init(configuration);
-        helixServerStarter.start();
-        _serverStarters.add(helixServerStarter);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    for (int i = 0; i < numServers; i++) {
+      _serverStarters.add(startOneServer(i));
     }
   }
 
-  protected List<HelixServerStarter> getServerStarters() {
-    return _serverStarters;
-  }
-
-  protected void startServerHttps() {
+  protected HelixServerStarter startOneServer(int serverId)
+      throws Exception {
+    PinotConfiguration serverConf = getDefaultServerConfiguration();
+    serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+    serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    serverConf.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + serverId);
+    serverConf.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
+        Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + serverId);
+    serverConf.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, Server.DEFAULT_ADMIN_API_PORT - serverId);
+    serverConf.setProperty(Server.CONFIG_OF_NETTY_PORT, Helix.DEFAULT_SERVER_NETTY_PORT + serverId);
+    serverConf.setProperty(Server.CONFIG_OF_GRPC_PORT, Server.DEFAULT_GRPC_PORT + serverId);
+    // Thread time measurement is disabled by default, enable it in integration tests.
+    // TODO: this can be removed when we eventually enable thread time measurement by default.
+    serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
+    overrideServerConf(serverConf);
+
+    HelixServerStarter serverStarter = new HelixServerStarter();
+    serverStarter.init(serverConf);
+    serverStarter.start();
+    return serverStarter;
+  }
+
+  protected void startServerHttps()
+      throws Exception {
     FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
     _serverStarters = new ArrayList<>();
 
-    Map<String, Object> properties = getDefaultServerConfiguration().toMap();
-    properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-    properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    PinotConfiguration serverConf = getDefaultServerConfiguration();
+    serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+    serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    overrideServerConf(serverConf);
 
-    PinotConfiguration configuration = new PinotConfiguration(properties);
-    overrideServerConf(configuration);
+    HelixServerStarter serverStarter = new HelixServerStarter();
+    serverStarter.init(serverConf);
+    serverStarter.start();
+    _serverStarters.add(serverStarter);
+  }
 
-    try {
-      HelixServerStarter helixServerStarter = new HelixServerStarter();
-      helixServerStarter.init(configuration);
-      _serverStarters.add(helixServerStarter);
-      helixServerStarter.start();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+  protected List<HelixServerStarter> getServerStarters() {
+    return _serverStarters;
   }
 
   protected PinotConfiguration getDefaultMinionConfiguration() {
@@ -276,107 +243,54 @@ public abstract class ClusterTest extends ControllerTest {
 
   // NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
   //       to manage the instance level configs
-  protected void startMinion() {
+  protected void startMinion()
+      throws Exception {
     FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
-    try {
-      PinotConfiguration minionConf = getDefaultMinionConfiguration();
-      minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-      minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
-      _minionStarter = new MinionStarter();
-      _minionStarter.init(minionConf);
-      _minionStarter.start();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected void overrideServerConf(PinotConfiguration configuration) {
-    // Do nothing, to be overridden by tests if they need something specific
-  }
-
-  protected void overrideBrokerConf(PinotConfiguration configuration) {
-    // Do nothing, to be overridden by tests if they need something specific
+    PinotConfiguration minionConf = getDefaultMinionConfiguration();
+    minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
+    minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+    _minionStarter = new MinionStarter();
+    _minionStarter.init(minionConf);
+    _minionStarter.start();
   }
 
   protected void stopBroker() {
     assertNotNull(_brokerStarters, "Brokers are not started");
     for (HelixBrokerStarter brokerStarter : _brokerStarters) {
-      try {
-        brokerStarter.stop();
-      } catch (Exception e) {
-        LOGGER.error("Encountered exception while stopping broker {}", e.getMessage());
-      }
+      brokerStarter.stop();
     }
     _brokerStarters = null;
   }
 
   protected void stopServer() {
     assertNotNull(_serverStarters, "Servers are not started");
-    for (HelixServerStarter helixServerStarter : _serverStarters) {
-      try {
-        helixServerStarter.stop();
-      } catch (Exception e) {
-        LOGGER.error("Encountered exception while stopping server {}", e.getMessage());
-      }
+    for (HelixServerStarter serverStarter : _serverStarters) {
+      serverStarter.stop();
     }
     FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
     _serverStarters = null;
   }
 
-  protected void restartServers(int numServers) {
-    assertNotNull(_serverStarters, "Servers are not started");
-    for (HelixServerStarter helixServerStarter : _serverStarters) {
-      try {
-        helixServerStarter.stop();
-      } catch (Exception e) {
-        LOGGER.error("Encountered exception while stopping server {}", e.getMessage());
-      }
-    }
-
-    _serverStarters = new ArrayList<>(numServers);
-    String zkStr = getZkUrl();
-    int baseAdminApiPort = Server.DEFAULT_ADMIN_API_PORT;
-    int baseNettyPort = Helix.DEFAULT_SERVER_NETTY_PORT;
-    int baseGrpcPort = Server.DEFAULT_GRPC_PORT;
-    PinotConfiguration configuration = getDefaultServerConfiguration();
-    overrideServerConf(configuration);
-    try {
-      for (int i = 0; i < numServers; i++) {
-        configuration.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
-        configuration.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
-            Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
-        configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
-        configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
-          configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
-        }
-        configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
-        // Thread time measurement is disabled by default, enable it in integration tests.
-        // TODO: this can be removed when we eventually enable thread time measurement by default.
-        configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
-        HelixServerStarter helixServerStarter = new HelixServerStarter();
-        helixServerStarter.init(configuration);
-        helixServerStarter.start();
-        _serverStarters.add(helixServerStarter);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   protected void stopMinion() {
     assertNotNull(_minionStarter, "Minion is not started");
-    try {
-      _minionStarter.stop();
-    } catch (Exception e) {
-      LOGGER.error("Encountered exception while stopping minion {}", e.getMessage());
-    }
+    _minionStarter.stop();
     FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
     _minionStarter = null;
   }
 
+  protected void restartServers()
+      throws Exception {
+    assertNotNull(_serverStarters, "Servers are not started");
+    for (HelixServerStarter serverStarter : _serverStarters) {
+      serverStarter.stop();
+    }
+    int numServers = _serverStarters.size();
+    _serverStarters.clear();
+    for (int i = 0; i < numServers; i++) {
+      _serverStarters.add(startOneServer(i));
+    }
+  }
+
   /**
    * Upload all segments inside the given directory to the cluster.
    *
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
index 7be06f382e..f0a838b0a9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AdminConsoleIntegrationTest.java
@@ -24,8 +24,6 @@ import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
 import org.apache.pinot.controller.api.ControllerAdminApiApplication;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -36,7 +34,6 @@ import org.testng.annotations.Test;
  * Tests that the controller, broker and server admin consoles return the expected pages.
  */
 public class AdminConsoleIntegrationTest extends BaseClusterIntegrationTest {
-  private static final Logger LOGGER = LoggerFactory.getLogger(AdminConsoleIntegrationTest.class);
 
   @BeforeClass
   public void setUp()
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
index 3ab54fce7b..4f7677623c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
@@ -19,14 +19,14 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
 /**
  * Integration test that converts Avro data for 12 segments and runs queries against it.
  */
-public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest {
   private static final String TENANT_NAME = "TestTenant";
   private static final int NUM_OFFLINE_SEGMENTS = 8;
   private static final int NUM_REALTIME_SEGMENTS = 6;
@@ -51,7 +51,13 @@ public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest
   }
 
   @Override
-  protected void overrideServerConf(PinotConfiguration configuration) {
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    brokerConf.setProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc");
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
   }
 
   @BeforeClass
@@ -102,16 +108,9 @@ public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest
     // Start the Pinot cluster
     Map<String, Object> properties = getDefaultControllerConfiguration();
     properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
-
     startController(properties);
-
-    startBrokers(1, DEFAULT_BROKER_PORT, getZkUrl(),
-        Collections.singletonMap(CommonConstants.Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc"));
-
-    // Enable gRPC server
-    PinotConfiguration serverConfig = getDefaultServerConfiguration();
-    serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
-    startServers(2, serverConfig);
+    startBrokers(1);
+    startServers(2);
 
     // Create tenants
     createBrokerTenant(TENANT_NAME, 1);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index fb59a384b6..e5757a879c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -23,10 +23,8 @@ import java.util.Map;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
-import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.Test;
 
@@ -53,26 +51,18 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg
   }
 
   @Override
-  protected void startServers() {
-    startServers(NUM_SERVERS);
+  protected int getNumReplicas() {
+    return NUM_SERVERS;
   }
 
   @Test
   public void testUpdateBrokerResource()
       throws Exception {
     // Add a new broker to the cluster
-    Map<String, Object> properties = getDefaultBrokerConfiguration().toMap();
-    String clusterName = getHelixClusterName();
-    properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, clusterName);
-    properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
-    int port = NetUtils.findOpenPort(DEFAULT_BROKER_PORT);
-    properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
-    properties.put(CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
-    HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
-    brokerStarter.init(new PinotConfiguration(properties));
-    brokerStarter.start();
+    HelixBrokerStarter brokerStarter = startOneBroker(NUM_BROKERS);
 
     // Check if broker is added to all the tables in broker resource
+    String clusterName = getHelixClusterName();
     String brokerId = brokerStarter.getInstanceId();
     IdealState brokerResourceIdealState =
         _helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
@@ -128,6 +118,41 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg
     assertFalse(_helixAdmin.getInstancesInCluster(clusterName).contains(brokerId));
   }
 
+  // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testStarTreeTriggering() {
+    // Ignored
+  }
+
+  // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testDefaultColumns() {
+    // Ignored
+  }
+
+  // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testBloomFilterTriggering() {
+    // Ignored
+  }
+
+  // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testRangeIndexTriggering() {
+    // Ignored
+  }
+
+  // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testInvertedIndexTriggering() {
+    // Ignored
+  }
+
   @Test(enabled = false)
   @Override
   public void testHardcodedServerPartitionedSqlQueries() {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b334f1476e..b1b52d89c6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -60,7 +60,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -166,7 +165,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // Start the Pinot cluster
     startZk();
     startController();
-    startBrokers(getNumBrokers());
+    startBrokers();
     startServers();
 
     // Create and upload the schema and table config
@@ -217,10 +216,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     waitForAllDocsLoaded(600_000L);
   }
 
-  protected void startServers() {
-    // Enable gRPC server
-    PinotConfiguration serverConfig = getDefaultServerConfiguration();
-    startServer(serverConfig);
+  protected void startBrokers()
+      throws Exception {
+    startBrokers(getNumBrokers());
+  }
+
+  protected void startServers()
+      throws Exception {
+    startServers(getNumServers());
   }
 
   private void registerCallbackHandlers() {
@@ -428,8 +431,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       }
       Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
     } while (System.currentTimeMillis() < endTimeMs);
-    throw new TimeoutException(String
-        .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
+    throw new TimeoutException(
+        String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
   }
 
   @Test(dependsOnMethods = "testRangeIndexTriggering")
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index ee8ce590a2..cf9f9d2a9c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -38,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -47,30 +43,11 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.*;
 
 
-public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTestSet {
-  private static final int NUM_BROKERS = 1;
-  private static final int NUM_SERVERS = 1;
-  private static final int NUM_SEGMENTS = 12;
-
-  private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
-      new ArrayList<>(getNumBrokers() + getNumServers());
-  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
-  // Cache the table size after removing an index via reloading. Once this value
-  // is set, assert that table size always gets back to this value after removing
-  // any other kind of index.
-  private long _tableSizeAfterRemovingIndex;
-
-  protected int getNumBrokers() {
-    return NUM_BROKERS;
-  }
-
-  protected int getNumServers() {
-    return NUM_SERVERS;
-  }
+public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest {
 
   @Override
-  protected String getSchemaFileName() {
-    return _schemaFileName;
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
   }
 
   @BeforeClass
@@ -81,8 +58,8 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest
     // Start the Pinot cluster
     startZk();
     startController();
-    startBrokers(getNumBrokers());
-    startServers();
+    startBroker();
+    startServer();
 
     // Create and upload the schema and table config
     Schema schema = createSchema();
@@ -103,53 +80,10 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest
     // Initialize the query generator
     setUpQueryGenerator(avroFiles);
 
-    // Set up service status callbacks
-    // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
-    // resources to monitor
-    registerCallbackHandlers();
-
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
   }
 
-  void setExtraServerConfigs(PinotConfiguration serverConfig) {
-  }
-
-  protected void startServers() {
-    // Enable gRPC server
-    PinotConfiguration serverConfig = getDefaultServerConfiguration();
-    serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
-    setExtraServerConfigs(serverConfig);
-    startServer(serverConfig);
-  }
-
-  private void registerCallbackHandlers() {
-    List<String> instances = _helixAdmin.getInstancesInCluster(getHelixClusterName());
-    instances.removeIf(
-        instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance.startsWith(
-            CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
-    List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName());
-    resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource)
-        && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource)));
-    for (String instance : instances) {
-      List<String> resourcesToMonitor = new ArrayList<>();
-      for (String resourceName : resourcesInCluster) {
-        IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName);
-        for (String partitionName : idealState.getPartitionSet()) {
-          if (idealState.getInstanceSet(partitionName).contains(instance)) {
-            resourcesToMonitor.add(resourceName);
-            break;
-          }
-        }
-      }
-      _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(
-          new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
-              instance, resourcesToMonitor, 100.0),
-          new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(),
-              instance, resourcesToMonitor, 100.0))));
-    }
-  }
-
   public GrpcQueryClient getGrpcQueryClient() {
     return new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT);
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
index 1183b79202..7826c692ea 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
 
 
 public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerIntegrationTest {
@@ -33,16 +33,17 @@ public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerInt
   private final URL _tlsStoreJKS = OfflineSecureGRPCServerIntegrationTest.class.getResource("/tlstest.jks");
 
   @Override
-  protected void setExtraServerConfigs(PinotConfiguration serverConfig) {
-    serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, true);
-    serverConfig.setProperty("pinot.server.grpctls.client.auth.enabled", true);
-    serverConfig.setProperty("pinot.server.grpctls.keystore.type", JKS);
-    serverConfig.setProperty("pinot.server.grpctls.keystore.path", _tlsStoreJKS);
-    serverConfig.setProperty("pinot.server.grpctls.keystore.password", PASSWORD);
-    serverConfig.setProperty("pinot.server.grpctls.truststore.type", JKS);
-    serverConfig.setProperty("pinot.server.grpctls.truststore.path", _tlsStoreJKS);
-    serverConfig.setProperty("pinot.server.grpctls.truststore.password", PASSWORD);
-    serverConfig.setProperty("pinot.server.grpctls.ssl.provider", JDK);
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
+    serverConf.setProperty(Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, true);
+    serverConf.setProperty("pinot.server.grpctls.client.auth.enabled", true);
+    serverConf.setProperty("pinot.server.grpctls.keystore.type", JKS);
+    serverConf.setProperty("pinot.server.grpctls.keystore.path", _tlsStoreJKS);
+    serverConf.setProperty("pinot.server.grpctls.keystore.password", PASSWORD);
+    serverConf.setProperty("pinot.server.grpctls.truststore.type", JKS);
+    serverConf.setProperty("pinot.server.grpctls.truststore.path", _tlsStoreJKS);
+    serverConf.setProperty("pinot.server.grpctls.truststore.password", PASSWORD);
+    serverConf.setProperty("pinot.server.grpctls.ssl.provider", JDK);
   }
 
   @Override
@@ -57,6 +58,6 @@ public class OfflineSecureGRPCServerIntegrationTest extends OfflineGRPCServerInt
     configMap.put("tls.truststore.type", JKS);
     configMap.put("tls.ssl.provider", JDK);
     GrpcQueryClient.Config config = new GrpcQueryClient.Config(configMap);
-    return new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT, config);
+    return new GrpcQueryClient("localhost", Server.DEFAULT_GRPC_PORT, config);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
index 5067eb1821..77a5fd959c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -114,7 +114,8 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClust
   }
 
   @Override
-  public void startServer() {
+  public void startServer()
+      throws Exception {
     startServers(NUM_SERVERS);
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 09d698d534..616f150d52 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -173,7 +173,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
     verifyTableIdealStates(idealState);
 
     // Restart the servers and check every segment is not in ERROR state.
-    restartServers(NUM_SERVERS);
+    restartServers();
     verifyTableIdealStates(idealState);
     ExternalView ev =
         HelixHelper.getExternalViewForResource(_helixAdmin, this.getHelixClusterName(), TABLE_NAME_WITH_TYPE);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org