You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/10 05:32:25 UTC

[incubator-pinot] 01/01: Improve controller tests

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch helix_debug
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5b9cfaff5aabe4195220191902b5f4cfb5d58e87
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Jul 9 22:32:04 2019 -0700

    Improve controller tests
---
 .../broker/broker/helix/ClusterChangeMediator.java |  12 +-
 .../apache/pinot/controller/ControllerConf.java    |  22 +--
 .../apache/pinot/controller/ControllerStarter.java |  15 +-
 .../api/resources/PinotControllerHealthCheck.java  |   4 +-
 .../resources/PinotTableConfigRestletResource.java |   2 +
 .../api/resources/PinotTableRestletResource.java   |  10 +-
 .../helix/core/PinotHelixResourceManager.java      |  33 ++--
 .../helix/core/SegmentDeletionManager.java         |  17 +-
 .../helix/core/util/HelixSetupUtils.java           | 180 ++++++++-------------
 .../controller/validation/StorageQuotaChecker.java |  12 +-
 .../helix/ControllerPeriodicTaskStarterTest.java   |  23 +--
 .../pinot/controller/helix/ControllerTest.java     |  47 ++----
 .../controller/helix/PinotControllerModeTest.java  |  10 +-
 .../tests/BaseClusterIntegrationTest.java          |   3 +-
 .../tests/OfflineClusterIntegrationTest.java       |  13 +-
 15 files changed, 145 insertions(+), 258 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 64b2a1e..72be04d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -62,7 +62,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
 
   private final Thread _clusterChangeHandlingThread;
 
-  private volatile boolean _stopped = false;
+  private boolean _stopped = false;
 
   public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap,
       BrokerMetrics brokerMetrics) {
@@ -145,7 +145,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   /**
    * Starts the cluster change mediator.
    */
-  public void start() {
+  public synchronized void start() {
     LOGGER.info("Starting the cluster change handling thread");
     _clusterChangeHandlingThread.start();
   }
@@ -153,7 +153,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
   /**
    * Stops the cluster change mediator.
    */
-  public void stop() {
+  public synchronized void stop() {
     LOGGER.info("Stopping the cluster change handling thread");
     _stopped = true;
     synchronized (_lastChangeTimeMap) {
@@ -197,7 +197,11 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
    *
    * @param changeType Type of the change
    */
-  private void enqueueChange(ChangeType changeType) {
+  private synchronized void enqueueChange(ChangeType changeType) {
+    // Do not enqueue changes if already stopped
+    if (_stopped) {
+      return;
+    }
     if (_clusterChangeHandlingThread.isAlive()) {
       LOGGER.info("Enqueue {} change", changeType);
       synchronized (_lastChangeTimeMap) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 697fcf3..414d3bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller;
 import java.io.File;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,9 +58,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONTROLLER_MODE = "controller.mode";
 
   public enum ControllerMode {
-    DUAL,
-    PINOT_ONLY,
-    HELIX_ONLY
+    DUAL, PINOT_ONLY, HELIX_ONLY
   }
 
   public static class ControllerPeriodicTasksConf {
@@ -175,22 +172,17 @@ public class ControllerConf extends PropertiesConfiguration {
    * Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists.
    */
   public static URI getUriFromPath(String path) {
-    try {
-      URI uri = new URI(path);
-      if (uri.getScheme() != null) {
-        return uri;
-      } else {
-        return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME, path, null);
-      }
-    } catch (URISyntaxException e) {
-      LOGGER.error("Could not construct uri from path {}", path);
-      throw new RuntimeException(e);
+    URI uri = URI.create(path);
+    if (uri.getScheme() == null) {
+      uri = URI.create(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME + ":" + path);
     }
+    return uri;
   }
 
   public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) {
     try {
-      return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
+      return getUriFromPath(
+          StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
     } catch (UnsupportedEncodingException e) {
       LOGGER
           .error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..ac7dbff 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.yammer.metrics.core.MetricsRegistry;
@@ -236,12 +237,10 @@ public class ControllerStarter {
   }
 
   private void setUpPinotController() {
-    // Note: Right now we don't allow pinot-only mode to be used in production yet.
-    // Now we only have this mode used in tests.
-    // TODO: Remove this logic once all the helix separation PRs are committed.
-    if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) {
-      throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
-    }
+    // Note: Right now we don't allow Pinot-only controller as ControllerLeadershipManager is setup in Helix controller
+    //       and Pinot controller relies on it
+    // TODO: Remove ControllerLeadershipManager
+    Preconditions.checkState(_controllerLeadershipManager != null);
 
     // Set up Pinot cluster in Helix
     HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode);
@@ -506,10 +505,6 @@ public class ControllerStarter {
     }
   }
 
-  public boolean isPinotOnlyModeSupported() {
-    return false;
-  }
-
   public MetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
 
 
 @Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
 public class PinotControllerHealthCheck {
 
   @Inject
   ControllerConf controllerConf;
 
   @GET
-  @Path("/")
+  @Path("pinot-controller/admin")
   @ApiOperation(value = "Check controller health")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
   @Produces(MediaType.TEXT_PLAIN)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
             .type(MediaType.TEXT_PLAIN_TYPE).build();
       }
 
+      // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null
+      //       schema with null schema name
       if (config.getSchema() != null) {
         _resourceManager.addOrUpdateSchema(config.getSchema());
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.util.ReplicationUtils;
 import org.slf4j.LoggerFactory;
 
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
   @ApiOperation(value = "Deletes a table", notes = "Deletes a table")
   public SuccessResponse deleteTable(
       @ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName,
-      @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
+      @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
     List<String> tablesDeleted = new LinkedList<>();
     try {
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+          && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteOfflineTable(tableName);
         tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
       }
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+          && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteRealtimeTable(tableName);
         tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
       }
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
       throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
     }
 
-
     if (verifyReplication) {
       int requestReplication;
       try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..3c0ca0b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -160,10 +160,10 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
-            + controllerConf.getControllerPort(), controllerConf.getDataDir(),
-        controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
-        controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf
+            .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(),
+        controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
+        controllerConf.getHLCTablesAllowed());
   }
 
   /**
@@ -1087,9 +1087,6 @@ public class PinotHelixResourceManager {
         // lets add table configs
         ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
 
-        _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
-            new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
         // Update replica group partition assignment to the property store if applicable
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
@@ -1228,9 +1225,8 @@ public class PinotHelixResourceManager {
           servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
         }
         int numReplicas = ReplicationUtils.getReplication(tableConfig);
-        ReplicaGroupPartitionAssignment partitionAssignment =
-            partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig,
-                numReplicas, servers);
+        ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator
+            .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers);
         partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
       }
     }
@@ -1272,7 +1268,8 @@ public class PinotHelixResourceManager {
     // Check if HLC table is allowed.
     StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
     if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
-      throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
+      throw new InvalidTableConfigException(
+          "Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
     }
   }
 
@@ -1468,8 +1465,8 @@ public class PinotHelixResourceManager {
     LOGGER.info("Deleting table {}: Finish", offlineTableName);
   }
 
-  public void deleteRealtimeTable(String rawTableName) {
-    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+  public void deleteRealtimeTable(String tableName) {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     LOGGER.info("Deleting table {}: Start", realtimeTableName);
 
     // Remove the table from brokerResource
@@ -2071,14 +2068,12 @@ public class PinotHelixResourceManager {
         : PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
   }
 
-  public boolean hasRealtimeTable(String tableName) {
-    String actualTableName = tableName + "_REALTIME";
-    return getAllTables().contains(actualTableName);
+  public boolean hasOfflineTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
   }
 
-  public boolean hasOfflineTable(String tableName) {
-    String actualTableName = tableName + "_OFFLINE";
-    return getAllTables().contains(actualTableName);
+  public boolean hasRealtimeTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 46c84c0..6415b06 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -172,16 +172,17 @@ public class SegmentDeletionManager {
   }
 
   protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
-    final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    // Ignore HLC segments as they are not stored in Pinot FS
+    if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+      return;
+    }
     if (_dataDir != null) {
-      URI fileToMoveURI;
-      PinotFS pinotFS;
-      URI dataDirURI = ControllerConf.getUriFromPath(_dataDir);
-      fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+      URI fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
       URI deletedSegmentDestURI = ControllerConf
           .constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName,
               segmentId);
-      pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
+      PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
 
       try {
         if (pinotFS.exists(fileToMoveURI)) {
@@ -197,9 +198,7 @@ public class SegmentDeletionManager {
                 deletedSegmentDestURI.toString());
           }
         } else {
-          if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
-            LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
-          }
+          LOGGER.warn("Not found local segment file for segment {}", fileToMoveURI.toString());
         }
       } catch (IOException e) {
         LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 82c4cab..74682c8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -20,16 +20,10 @@ package org.apache.pinot.controller.helix.core.util;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -37,16 +31,15 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -102,150 +95,103 @@ public class HelixSetupUtils {
    */
   public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
       boolean enableBatchMessageMode) {
-    final HelixAdmin admin = new ZKHelixAdmin(zkPath);
-    Preconditions.checkState(admin.getClusters().contains(helixClusterName),
-        String.format("Helix cluster: %s hasn't been set up", helixClusterName));
-
-    // Add segment state model definition if needed
-    addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel);
-
-    // Add broker resource if needed
-    createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
-
-    // Add lead controller resource if needed
-    createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
-
-    // Init property store if needed
-    initPropertyStoreIfNeeded(helixClusterName, zkPath);
+    HelixZkClient zkClient = null;
+    try {
+      zkClient = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkPath),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()).setConnectInitTimeout(
+              TimeUnit.SECONDS.toMillis(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC)));
+      zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+      HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
+      HelixDataAccessor helixDataAccessor =
+          new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
+
+      Preconditions.checkState(helixAdmin.getClusters().contains(helixClusterName),
+          String.format("Helix cluster: %s hasn't been set up", helixClusterName));
+
+      // Add segment state model definition if needed
+      addSegmentStateModelDefinitionIfNeeded(helixClusterName, helixAdmin, helixDataAccessor, isUpdateStateModel);
+
+      // Add broker resource if needed
+      createBrokerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode);
+
+      // Add lead controller resource if needed
+      createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode);
+    } finally {
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
   }
 
-  private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath,
-      boolean isUpdateStateModel) {
-    final String segmentStateModelName =
+  private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
+      HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
+    String segmentStateModelName =
         PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName);
-    if (stateModelDefinition == null) {
-      LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName,
-          PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-      admin.addStateModelDef(helixClusterName, segmentStateModelName,
-          PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-    } else if (isUpdateStateModel) {
-      final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
-      List<String> states = curStateModelDef.getStatesPriorityList();
-      if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
-        LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
+    StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    if (stateModelDefinition == null || isUpdateStateModel) {
+      if (stateModelDefinition == null) {
+        LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
       } else {
-        LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
-        StateModelDefinition newStateModelDef =
-            PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
-        ZkClient zkClient = new ZkClient(zkPath);
-        zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
-        zkClient.setZkSerializer(new ZNRecordSerializer());
-        HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
-        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
-        LOGGER.info("Completed updating state model {}", segmentStateModelName);
-        zkClient.close();
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
       }
+      helixDataAccessor
+          .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
   }
 
-  private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+  private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
       boolean enableBatchMessageMode) {
     // Add broker resource online offline state model definition if needed
-    StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName,
+    StateModelDefinition brokerResourceStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
         PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
     if (brokerResourceStateModelDefinition == null) {
       LOGGER.info("Adding state model definition named : {} generated using : {}",
           PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
           PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
-      admin.addStateModelDef(helixClusterName,
+      helixAdmin.addStateModelDef(helixClusterName,
           PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
           PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     // Create broker resource if needed.
-    IdealState brokerResourceIdealState =
-        admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    if (brokerResourceIdealState == null) {
+    if (helixAdmin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) == null) {
       LOGGER.info("Adding empty ideal state for Broker!");
-      HelixHelper
-          .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
-              admin);
-      IdealState idealState = PinotTableIdealStateBuilder
-          .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
-      admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+      IdealState emptyIdealStateForBrokerResource = PinotTableIdealStateBuilder
+          .buildEmptyIdealStateForBrokerResource(helixAdmin, helixClusterName, enableBatchMessageMode);
+      helixAdmin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
+          emptyIdealStateForBrokerResource);
     }
   }
 
-  private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+  private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
       boolean enableBatchMessageMode) {
-    StateModelDefinition masterSlaveStateModelDefinition =
-        admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
-    if (masterSlaveStateModelDefinition == null) {
-      LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name,
-          MasterSlaveSMD.class.toString());
-      admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build());
-    }
-
-    IdealState leadControllerResourceIdealState =
-        admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
-    if (leadControllerResourceIdealState == null) {
+    if (helixAdmin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME) == null) {
       LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
-      HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
-      // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
-      admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
-          CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name,
-          IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName());
 
+      // FULL-AUTO Master-Slave state model with CrushED rebalance strategy.
+      IdealState leadControllerResourceIdealState = new IdealState(LEAD_CONTROLLER_RESOURCE_NAME);
+      leadControllerResourceIdealState
+          .setNumPartitions(CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+      leadControllerResourceIdealState.setStateModelDefRef(MasterSlaveSMD.name);
+      leadControllerResourceIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      leadControllerResourceIdealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+      leadControllerResourceIdealState.setReplicas("0");
       // Set instance group tag for lead controller resource.
-      IdealState leadControllerIdealState =
-          admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
-      leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
-      leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+      leadControllerResourceIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      leadControllerResourceIdealState.setBatchMessageMode(enableBatchMessageMode);
       // The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened.
       // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment.
       // This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently.
       // Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs.
-      leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
-      leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
-      leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
-      admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
-
+      leadControllerResourceIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+      leadControllerResourceIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+      leadControllerResourceIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
       // Explicitly disable this resource when creating this new resource.
       // When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility.
       // In the next major release, we can enable this resource by default, so that all the controller logic can be separated.
-      admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false);
+      leadControllerResourceIdealState.enable(false);
 
-      LOGGER.info("Re-balance lead controller resource with replicas: {}",
-          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
-      // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions.
-      admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
-          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
-    }
-  }
-
-  private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) {
-    String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName);
-    ZkHelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath);
-    if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+      helixAdmin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerResourceIdealState);
     }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..efc1640 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -105,14 +105,16 @@ public class StorageQuotaChecker {
 
     if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) {
       // no quota configuration...so ignore for backwards compatibility
-      LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType);
-      return success("Quota configuration not set for table: " + tableNameWithType);
+      LOGGER.info("Storage quota is not configured for table: {}, skipping the check", tableNameWithType);
+      return success("Storage quota is not configured for table: " + tableNameWithType);
     }
 
     long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
-    if (allowedStorageBytes < 0) {
-      LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType);
-      return success("Storage quota is not configured for table: " + tableNameWithType);
+    if (allowedStorageBytes <= 0) {
+      LOGGER.warn("Invalid storage quota: {} for table: {}, skipping the check", quotaConfig.getStorage(),
+          tableNameWithType);
+      return success(
+          String.format("Invalid storage quota: %s for table: %s", quotaConfig.getStorage(), tableNameWithType));
     }
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
index 8f72a59..59d5003 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
@@ -30,7 +30,6 @@ import org.testng.annotations.Test;
 
 
 public class ControllerPeriodicTaskStarterTest extends ControllerTest {
-  private MockControllerStarter _mockControllerStarter;
 
   @BeforeClass
   public void setup() {
@@ -51,27 +50,11 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
   }
 
   @Override
-  protected void startControllerStarter(ControllerConf config) {
-    _mockControllerStarter = new MockControllerStarter(config);
-    _mockControllerStarter.start();
-    _helixResourceManager = _mockControllerStarter.getHelixResourceManager();
-    _helixManager = _mockControllerStarter.getHelixControllerManager();
+  protected ControllerStarter getControllerStarter(ControllerConf config) {
+    return new MockControllerStarter(config);
   }
 
-  @Override
-  protected void stopControllerStarter() {
-    Assert.assertNotNull(_mockControllerStarter);
-
-    _mockControllerStarter.stop();
-    _mockControllerStarter = null;
-  }
-
-  @Override
-  protected ControllerStarter getControllerStarter() {
-    return _mockControllerStarter;
-  }
-
-  private class MockControllerStarter extends TestOnlyControllerStarter {
+  private class MockControllerStarter extends ControllerStarter {
 
     private static final int NUM_PERIODIC_TASKS = 7;
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 9893377..9eadb70 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -38,7 +39,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
@@ -66,7 +66,6 @@ public abstract class ControllerTest {
   protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
   protected String _controllerDataDir;
 
-  protected ZkClient _zkClient;
   protected ControllerStarter _controllerStarter;
   protected PinotHelixResourceManager _helixResourceManager;
   protected HelixManager _helixManager;
@@ -95,58 +94,33 @@ public abstract class ControllerTest {
     }
   }
 
-  public static ControllerConf getDefaultControllerConfiguration() {
+  public ControllerConf getDefaultControllerConfiguration() {
     ControllerConf config = new ControllerConf();
     config.setControllerHost(LOCAL_HOST);
     config.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT));
     config.setDataDir(DEFAULT_DATA_DIR);
     config.setZkStr(ZkStarter.DEFAULT_ZK_STR);
+    config.setHelixClusterName(getHelixClusterName());
 
     return config;
   }
 
-  public class TestOnlyControllerStarter extends ControllerStarter {
-
-    TestOnlyControllerStarter(ControllerConf conf) {
-      super(conf);
-    }
-
-    @Override
-    public boolean isPinotOnlyModeSupported() {
-      return true;
-    }
-  }
-
   protected void startController() {
     startController(getDefaultControllerConfiguration());
   }
 
   protected void startController(ControllerConf config) {
-    startController(config, true);
-  }
-
-  protected void startController(ControllerConf config, boolean deleteCluster) {
-    Assert.assertNotNull(config);
-    Assert.assertNull(_controllerStarter);
+    Preconditions.checkState(_controllerStarter == null);
 
     _controllerPort = Integer.valueOf(config.getControllerPort());
     _controllerBaseApiUrl = "http://localhost:" + _controllerPort;
     _controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
     _controllerDataDir = config.getDataDir();
 
-    String helixClusterName = getHelixClusterName();
-    config.setHelixClusterName(helixClusterName);
-
-    String zkStr = config.getZkStr();
-    _zkClient = new ZkClient(zkStr);
-    if (_zkClient.exists("/" + helixClusterName) && deleteCluster) {
-      _zkClient.deleteRecursive("/" + helixClusterName);
-    }
-
     startControllerStarter(config);
 
     // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
-    switch (getControllerStarter().getControllerMode()) {
+    switch (_controllerStarter.getControllerMode()) {
       case DUAL:
       case PINOT_ONLY:
         _helixAdmin = _helixResourceManager.getHelixAdmin();
@@ -160,16 +134,19 @@ public abstract class ControllerTest {
   }
 
   protected void startControllerStarter(ControllerConf config) {
-    _controllerStarter = new TestOnlyControllerStarter(config);
+    _controllerStarter = getControllerStarter(config);
     _controllerStarter.start();
     _helixResourceManager = _controllerStarter.getHelixResourceManager();
     _helixManager = _controllerStarter.getHelixControllerManager();
   }
 
+  protected ControllerStarter getControllerStarter(ControllerConf config) {
+    return new ControllerStarter(config);
+  }
+
   protected void stopController() {
     stopControllerStarter();
     FileUtils.deleteQuietly(new File(_controllerDataDir));
-    _zkClient.close();
   }
 
   protected void stopControllerStarter() {
@@ -179,10 +156,6 @@ public abstract class ControllerTest {
     _controllerStarter = null;
   }
 
-  protected ControllerStarter getControllerStarter() {
-    return _controllerStarter;
-  }
-
   protected Schema createDummySchema(String tableName) {
     Schema schema = new Schema();
     schema.setSchemaName(tableName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index d91e612..bcd9bf8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -78,12 +78,11 @@ public class PinotControllerModeTest extends ControllerTest {
 
     // Starting a second dual-mode controller. Helix cluster has already been set up.
     ControllerConf controllerConfig = getDefaultControllerConfiguration();
-    controllerConfig.setHelixClusterName(getHelixClusterName());
     controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
     controllerConfig.setControllerPort(
         Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++));
 
-    ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig);
+    ControllerStarter secondDualModeController = getControllerStarter(controllerConfig);
     secondDualModeController.start();
     TestUtils
         .waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
@@ -113,7 +112,6 @@ public class PinotControllerModeTest extends ControllerTest {
 
     // Starting a helix controller.
     ControllerConf config2 = getDefaultControllerConfiguration();
-    config2.setHelixClusterName(getHelixClusterName());
     config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
     config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
     ControllerStarter helixControllerStarter = new ControllerStarter(config2);
@@ -128,11 +126,10 @@ public class PinotControllerModeTest extends ControllerTest {
 
     // Starting a pinot only controller.
     ControllerConf config3 = getDefaultControllerConfiguration();
-    config3.setHelixClusterName(getHelixClusterName());
     config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
     config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
 
-    ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3);
+    ControllerStarter firstPinotOnlyController = getControllerStarter(config3);
     firstPinotOnlyController.start();
     PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
         firstPinotOnlyController.getHelixResourceManager();
@@ -143,11 +140,10 @@ public class PinotControllerModeTest extends ControllerTest {
 
     // Start a second Pinot only controller.
     ControllerConf config4 = getDefaultControllerConfiguration();
-    config4.setHelixClusterName(getHelixClusterName());
     config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
     config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
 
-    ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4);
+    ControllerStarter secondControllerStarter = getControllerStarter(config4);
     secondControllerStarter.start();
     // Two controller instances assigned to cluster.
     TestUtils
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 914adda..1a3b7e6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
-  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<KafkaServerStartable> _kafkaStarters;
@@ -184,7 +183,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   @Nullable
-  protected  String getServerTenant() {
+  protected String getServerTenant() {
     return TagNameUtils.DEFAULT_TENANT_NAME;
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9ae4eef..bd8ab1a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response2 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response3 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response4 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+    pqlQuery =
+        "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
     JsonNode response5 = postQuery(pqlQuery);
 
     double val1 = response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       double val2 = response2.get("aggregationResults").get(0).get("value").asDouble();
       Assert.assertEquals(val1, val2);
     }
-
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org