You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/15 03:26:48 UTC

[incubator-pinot] branch master updated: Clean up all integration tests and standardize the creation of schema, table config and segments (#5385)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e2701  Clean up all integration tests and standardize the creation of schema, table config and segments (#5385)
46e2701 is described below

commit 46e2701230530cd93491c53a6f83caad09bbc97c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu May 14 20:26:38 2020 -0700

    Clean up all integration tests and standardize the creation of schema, table config and segments (#5385)
    
    - Create schema and table config based on the settings in the BaseClusterIntegrationTest
    - Remove the methods with huge arguments list
    - Always use the schema and table config in the cluster to create the segments
    - Simplify the cluster setup methods
    
    The following integration tests are removed:
    - DefaultCommitterRealtimeIntegrationTest: Merged into LLCRealtimeClusterIntegrationTest
    - PinotURIUploadIntegrationTest: Already covered in the module-level FileUploadDownloadClientTest
    
    ControllerPeriodicTasksIntegrationTest is refactored (was not running before because the wrong test name '*Tests')
    RealtimeStressTest is removed as it is the same as BenchmarkRealtimeConsumptionSpeed
---
 .../common/utils/FileUploadDownloadClient.java     |  10 +-
 .../pinot/controller/helix/ControllerTest.java     |  58 +-
 .../generator/SegmentGeneratorConfig.java          |   5 +
 .../tests/BaseClusterIntegrationTest.java          | 305 ++++++-----
 .../tests/BaseClusterIntegrationTestSet.java       |   6 +-
 .../tests/ClusterIntegrationTestUtils.java         | 151 +++---
 .../pinot/integration/tests/ClusterTest.java       | 252 +--------
 .../ControllerPeriodicTasksIntegrationTest.java    | 379 +++++++++++++
 .../ControllerPeriodicTasksIntegrationTests.java   | 586 ---------------------
 ...vertToRawIndexMinionClusterIntegrationTest.java |   9 +-
 .../DefaultCommitterRealtimeIntegrationTest.java   | 202 -------
 ...lakyConsumerRealtimeClusterIntegrationTest.java |   8 -
 .../tests/HybridClusterIntegrationTest.java        |  86 ++-
 ...ridClusterIntegrationTestCommandLineRunner.java | 138 +++--
 .../tests/JsonPathClusterIntegrationTest.java      | 172 ++----
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  63 +--
 .../LuceneRealtimeClusterIntegrationTest.java      | 230 ++++----
 .../tests/MapTypeClusterIntegrationTest.java       |  56 +-
 ...onaryAggregationPlanClusterIntegrationTest.java | 411 +++++----------
 .../tests/OfflineClusterIntegrationTest.java       | 179 +++----
 .../tests/PinotURIUploadIntegrationTest.java       | 273 ----------
 .../tests/RealtimeClusterIntegrationTest.java      |  25 +-
 .../tests/SegmentCompletionIntegrationTest.java    |  15 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  |  12 +-
 .../tests/StarTreeClusterIntegrationTest.java      | 123 ++---
 .../tests/UploadRefreshDeleteIntegrationTest.java  | 296 -----------
 .../apache/pinot/server/util/SegmentTestUtils.java |  68 ---
 pinot-perf/pom.xml                                 |   5 -
 .../perf/BenchmarkRealtimeConsumptionSpeed.java    |  69 +--
 .../org/apache/pinot/perf/RealtimeStressTest.java  | 129 -----
 30 files changed, 1402 insertions(+), 2919 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 9172fa3..4e9a303 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -497,20 +498,19 @@ public class FileUploadDownloadClient implements Closeable {
    * @param uri URI
    * @param segmentName Segment name
    * @param segmentFile Segment file
-   * @param rawTableName Raw table name
+   * @param tableName Table name with or without type suffix
    * @return Response
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String rawTableName)
+  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName)
       throws IOException, HttpErrorStatusException {
     // Add table name as a request parameter
-    NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName);
-    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+    NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
+    List<NameValuePair> parameters = Collections.singletonList(tableNameValuePair);
     return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
-
   /**
    * Upload segment with segment file input stream.
    *
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e02ff42..89c365d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -65,12 +65,15 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -81,6 +84,8 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLE
 import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
 import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
 import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
 
 /**
@@ -407,7 +412,58 @@ public abstract class ControllerTest {
       throws IOException {
     String url = _controllerRequestURLBuilder.forSchemaCreate();
     PostMethod postMethod = sendMultipartPostRequest(url, schema.toSingleLineJsonString());
-    Assert.assertEquals(postMethod.getStatusCode(), 200);
+    assertEquals(postMethod.getStatusCode(), 200);
+  }
+
+  protected Schema getSchema(String schemaName) {
+    Schema schema = _helixResourceManager.getSchema(schemaName);
+    assertNotNull(schema);
+    return schema;
+  }
+
+  protected void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
+  }
+
+  protected void updateTableConfig(TableConfig tableConfig)
+      throws IOException {
+    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableConfig.getTableName()),
+        tableConfig.toJsonString());
+  }
+
+  protected TableConfig getOfflineTableConfig(String tableName) {
+    TableConfig offlineTableConfig = _helixResourceManager.getOfflineTableConfig(tableName);
+    Assert.assertNotNull(offlineTableConfig);
+    return offlineTableConfig;
+  }
+
+  protected TableConfig getRealtimeTableConfig(String tableName) {
+    TableConfig realtimeTableConfig = _helixResourceManager.getRealtimeTableConfig(tableName);
+    Assert.assertNotNull(realtimeTableConfig);
+    return realtimeTableConfig;
+  }
+
+  protected void dropOfflineTable(String tableName)
+      throws IOException {
+    sendDeleteRequest(
+        _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
+  }
+
+  protected void dropRealtimeTable(String tableName)
+      throws IOException {
+    sendDeleteRequest(
+        _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
+  }
+
+  protected void reloadOfflineTable(String tableName)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE.name()), null);
+  }
+
+  protected void reloadRealtimeTable(String tableName)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.REALTIME.name()), null);
   }
 
   protected String getBrokerTenantRequestPayload(String tenantName, int numBrokers) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 8fa4835..7126ca4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -130,6 +130,11 @@ public class SegmentGeneratorConfig {
 
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     if (indexingConfig != null) {
+      String segmentVersion = indexingConfig.getSegmentFormatVersion();
+      if (segmentVersion != null) {
+        _segmentVersion = SegmentVersion.valueOf(segmentVersion);
+      }
+
       List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
       Map<String, String> noDictionaryColumnMap = indexingConfig.getNoDictionaryConfig();
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 4e74c42..074d2d3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -20,28 +20,31 @@ package org.apache.pinot.integration.tests;
 
 import com.google.common.base.Function;
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -53,29 +56,30 @@ import org.testng.Assert;
 public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   // Default settings
-  private static final String DEFAULT_TABLE_NAME = "mytable";
-  private static final String DEFAULT_SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
-  private static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
-  private static final String DEFAULT_AVRO_TAR_FILE_NAME =
+  protected static final String DEFAULT_TABLE_NAME = "mytable";
+  protected static final String DEFAULT_SCHEMA_NAME = "mytable";
+  protected static final String DEFAULT_SCHEMA_FILE_NAME =
+      "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
+  protected static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
+  protected static final String DEFAULT_AVRO_TAR_FILE_NAME =
       "On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz";
-  private static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
-  private static final int DEFAULT_LLC_SEGMENT_FLUSH_SIZE = 5000;
-  private static final int DEFAULT_HLC_SEGMENT_FLUSH_SIZE = 20000;
-  private static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
-  private static final int DEFAULT_HLC_NUM_KAFKA_BROKERS = 1;
-  private static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
-  private static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
-  private static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
-  private static final String DEFAULT_SORTED_COLUMN = "Carrier";
-  private static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
-  private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
+  protected static final long DEFAULT_COUNT_STAR_RESULT = 115545L;
+  protected static final int DEFAULT_LLC_SEGMENT_FLUSH_SIZE = 5000;
+  protected static final int DEFAULT_HLC_SEGMENT_FLUSH_SIZE = 20000;
+  protected static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
+  protected static final int DEFAULT_HLC_NUM_KAFKA_BROKERS = 1;
+  protected static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
+  protected static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
+  protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
+  protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
       Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+  protected static final String DEFAULT_SORTED_COLUMN = "Carrier";
+  protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
   private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
-  private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Arrays.asList("", "Origin");
+  private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin");
+  protected static final int DEFAULT_NUM_REPLICAS = 1;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
-  protected final File _avroDir = new File(_tempDir, "avroDir");
-  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<StreamDataServerStartable> _kafkaStarters;
@@ -87,14 +91,20 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   /**
    * The following getters can be overridden to change default settings.
    */
+
   protected String getTableName() {
     return DEFAULT_TABLE_NAME;
   }
 
+  protected String getSchemaName() {
+    return DEFAULT_SCHEMA_NAME;
+  }
+
   protected String getSchemaFileName() {
     return DEFAULT_SCHEMA_FILE_NAME;
   }
 
+  @Nullable
   protected String getTimeColumnName() {
     return DEFAULT_TIME_COLUMN_NAME;
   }
@@ -107,10 +117,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     return DEFAULT_COUNT_STAR_RESULT;
   }
 
-  protected String getKafkaTopic() {
-    return getClass().getSimpleName();
-  }
-
   protected boolean useLlc() {
     return false;
   }
@@ -135,6 +141,14 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     }
   }
 
+  protected int getBaseKafkaPort() {
+    return KafkaStarterUtils.DEFAULT_KAFKA_PORT;
+  }
+
+  protected String getKafkaZKAddress() {
+    return KafkaStarterUtils.DEFAULT_ZK_STR;
+  }
+
   protected int getNumKafkaPartitions() {
     if (useLlc()) {
       return DEFAULT_LLC_NUM_KAFKA_PARTITIONS;
@@ -143,6 +157,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     }
   }
 
+  protected String getKafkaTopic() {
+    return getClass().getSimpleName();
+  }
+
   protected int getMaxNumKafkaMessagesPerBatch() {
     return DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH;
   }
@@ -168,18 +186,34 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   @Nullable
-  protected List<String> getBloomFilterIndexColumns() {
-    return DEFAULT_BLOOM_FILTER_COLUMNS;
+  protected List<String> getNoDictionaryColumns() {
+    return DEFAULT_NO_DICTIONARY_COLUMNS;
   }
 
   @Nullable
   protected List<String> getRangeIndexColumns() {
-    return DEFAULT_RANGE_INDEX_COLUMNS;
+    return null;
+    // TODO: Fix the JVM crash then uncomment this line
+//    return DEFAULT_RANGE_INDEX_COLUMNS;
+  }
+
+  @Nullable
+  protected List<String> getBloomFilterColumns() {
+    return DEFAULT_BLOOM_FILTER_COLUMNS;
   }
 
   @Nullable
-  protected List<String> getRawIndexColumns() {
-    return DEFAULT_RAW_INDEX_COLUMNS;
+  protected List<FieldConfig> getFieldConfigs() {
+    return null;
+  }
+
+  protected int getNumReplicas() {
+    return DEFAULT_NUM_REPLICAS;
+  }
+
+  @Nullable
+  protected String getSegmentVersion() {
+    return null;
   }
 
   @Nullable
@@ -193,23 +227,114 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   @Nullable
-  protected String getServerTenant() {
+  protected String getBrokerTenant() {
     return TagNameUtils.DEFAULT_TENANT_NAME;
   }
 
   @Nullable
-  protected String getBrokerTenant() {
+  protected String getServerTenant() {
     return TagNameUtils.DEFAULT_TENANT_NAME;
   }
 
-  protected SegmentPartitionConfig getSegmentPartitionConfig() {
+  /**
+   * The following methods are based on the getters. Override the getters for non-default settings before calling these
+   * methods.
+   */
+
+  /**
+   * Creates a new schema.
+   */
+  protected Schema createSchema()
+      throws IOException {
+    URL resourceUrl = BaseClusterIntegrationTest.class.getClassLoader().getResource(getSchemaFileName());
+    Assert.assertNotNull(resourceUrl);
+    return Schema.fromFile(new File(resourceUrl.getFile()));
+  }
+
+  /**
+   * Returns the schema in the cluster.
+   */
+  protected Schema getSchema() {
+    return getSchema(getSchemaName());
+  }
+
+  /**
+   * Creates a new OFFLINE table config.
+   */
+  protected TableConfig createOfflineTableConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant()).build();
+  }
 
-    ColumnPartitionConfig columnPartitionConfig = new ColumnPartitionConfig("murmur", 2);
-    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
-    columnPartitionConfigMap.put("AirlineID", columnPartitionConfig);
+  /**
+   * Returns the OFFLINE table config in the cluster.
+   */
+  protected TableConfig getOfflineTableConfig() {
+    return getOfflineTableConfig(getTableName());
+  }
+
+  /**
+   * Creates a new REALTIME table config.
+   */
+  protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+    Map<String, String> streamConfigs = new HashMap<>();
+    String streamType = "kafka";
+    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    boolean useLlc = useLlc();
+    if (useLlc) {
+      // LLC
+      streamConfigs
+          .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+              StreamConfig.ConsumerType.LOWLEVEL.toString());
+      streamConfigs.put(KafkaStreamConfigProperties
+              .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST),
+          KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    } else {
+      // HLC
+      streamConfigs
+          .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+              StreamConfig.ConsumerType.HIGHLEVEL.toString());
+      streamConfigs.put(KafkaStreamConfigProperties
+              .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
+          KafkaStarterUtils.DEFAULT_ZK_STR);
+      streamConfigs.put(KafkaStreamConfigProperties
+              .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER),
+          KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    }
+    streamConfigs.put(StreamConfigProperties
+            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+        getStreamConsumerFactoryClassName());
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+            getKafkaTopic());
+    AvroFileSchemaKafkaAvroMessageDecoder.avroFile = sampleAvroFile;
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+            AvroFileSchemaKafkaAvroMessageDecoder.class.getName());
+    streamConfigs
+        .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(getRealtimeSegmentFlushSize()));
+    streamConfigs.put(StreamConfigProperties
+        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant()).setLLC(useLlc).setStreamConfigs(streamConfigs).build();
+  }
 
-    SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(columnPartitionConfigMap);
-    return segmentPartitionConfig;
+  /**
+   * Returns the REALTIME table config in the cluster.
+   */
+  protected TableConfig getRealtimeTableConfig() {
+    return getRealtimeTableConfig(getTableName());
   }
 
   /**
@@ -245,55 +370,23 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   /**
-   * Set up H2 connection to a table with pre-loaded data.
-   *
-   * @param avroFiles List of Avro files to be loaded.
-   * @param executor Executor
-   * @throws Exception
+   * Sets up the H2 connection to a table with pre-loaded data.
    */
-  protected void setUpH2Connection(final List<File> avroFiles, Executor executor)
+  protected void setUpH2Connection(List<File> avroFiles)
       throws Exception {
     Assert.assertNull(_h2Connection);
     Class.forName("org.h2.Driver");
     _h2Connection = DriverManager.getConnection("jdbc:h2:mem:");
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getTableName(), _h2Connection);
-        } catch (Exception e) {
-          // Ignored
-        }
-      }
-    });
+    ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getTableName(), _h2Connection);
   }
 
   /**
-   * Set up query generator using the given Avro files.
-   *
-   * @param avroFiles List of Avro files
-   * @param executor Executor
+   * Sets up the query generator using the given Avro files.
    */
-  protected void setUpQueryGenerator(final List<File> avroFiles, Executor executor) {
+  protected void setUpQueryGenerator(List<File> avroFiles) {
     Assert.assertNull(_queryGenerator);
-    final String tableName = getTableName();
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
-      }
-    });
-  }
-
-  /**
-   * Get the schema file.
-   *
-   * @return Schema file
-   */
-  protected File getSchemaFile() {
-    URL resourceUrl = BaseClusterIntegrationTest.class.getClassLoader().getResource(getSchemaFileName());
-    Assert.assertNotNull(resourceUrl);
-    return new File(resourceUrl.getFile());
+    String tableName = getTableName();
+    _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
   }
 
   /**
@@ -311,30 +404,19 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   /**
-   * Push the data in the given Avro files into a Kafka stream.
+   * Pushes the data in the given Avro files into a Kafka stream.
    *
    * @param avroFiles List of Avro files
-   * @param kafkaTopic Kafka topic
-   * @param executor Executor
    */
-  protected void pushAvroIntoKafka(final List<File> avroFiles, final String kafkaTopic, Executor executor) {
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
-              getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
-        } catch (Exception e) {
-          // Ignored
-        }
-      }
-    });
+  protected void pushAvroIntoKafka(List<File> avroFiles)
+      throws Exception {
+    ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getBaseKafkaPort(), getKafkaTopic(),
+        getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
   }
 
   protected void startKafka() {
-    _kafkaStarters = KafkaStarterUtils
-        .startServers(getNumKafkaBrokers(), KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
-            KafkaStarterUtils.getDefaultKafkaConfiguration());
+    _kafkaStarters = KafkaStarterUtils.startServers(getNumKafkaBrokers(), getBaseKafkaPort(), getKafkaZKAddress(),
+        KafkaStarterUtils.getDefaultKafkaConfiguration());
     _kafkaStarters.get(0)
         .createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
   }
@@ -407,31 +489,4 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     ClusterIntegrationTestUtils
         .testSqlQuery(pinotQuery, _brokerBaseApiUrl, getPinotConnection(), sqlQueries, getH2Connection());
   }
-
-  protected void setUpRealtimeTable(File avroFile, int numReplicas, boolean useLLC, String tableName)
-      throws Exception {
-    File schemaFile = getSchemaFile();
-    Schema schema = Schema.fromFile(schemaFile);
-    String schemaName = schema.getSchemaName();
-    addSchema(schemaFile, schemaName);
-
-    String timeColumnName = getTimeColumnName();
-    FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-    Assert.assertNotNull(fieldSpec);
-    TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-    TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-    Assert.assertNotNull(outgoingTimeUnit);
-    String timeType = outgoingTimeUnit.toString();
-
-    addRealtimeTable(tableName, useLLC, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
-        getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
-        getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
-        getBloomFilterIndexColumns(), getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
-        numReplicas);
-  }
-
-  protected void setUpRealtimeTable(File avroFile)
-      throws Exception {
-    setUpRealtimeTable(avroFile, 1, useLlc(), getTableName());
-  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 89cda04..da4cf16 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -526,7 +526,7 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
   public void testReload(boolean includeOfflineTable)
       throws Exception {
     String rawTableName = getTableName();
-    Schema schema = Schema.fromFile(getSchemaFile());
+    Schema schema = getSchema();
 
     String selectStarQuery = "SELECT * FROM " + rawTableName;
     JsonNode queryResponse = postQuery(selectStarQuery);
@@ -554,9 +554,9 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
 
     // Reload the table
     if (includeOfflineTable) {
-      sendPostRequest(_controllerRequestURLBuilder.forTableReload(rawTableName, "OFFLINE"), null);
+      reloadOfflineTable(rawTableName);
     }
-    sendPostRequest(_controllerRequestURLBuilder.forTableReload(rawTableName, "REALTIME"), null);
+    reloadRealtimeTable(rawTableName);
 
     // Wait for all segments to finish reloading, and test querying the new columns
     // NOTE: Use count query to prevent schema inconsistency error
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 2782e54..64b1c47 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -40,9 +40,9 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
@@ -65,12 +65,12 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.server.util.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.testng.Assert;
@@ -92,8 +92,7 @@ public class ClusterIntegrationTestUtils {
    * @throws Exception
    */
   @SuppressWarnings("SqlNoDataSourceInspection")
-  public static void setUpH2TableWithAvro(@Nonnull List<File> avroFiles, @Nonnull String tableName,
-      @Nonnull Connection h2Connection)
+  public static void setUpH2TableWithAvro(List<File> avroFiles, String tableName, Connection h2Connection)
       throws Exception {
     int numFields;
 
@@ -196,7 +195,7 @@ public class ClusterIntegrationTestUtils {
    * @param avroFieldType Avro field type
    * @return Whether the given Avro field type is a single value type (non-NULL)
    */
-  private static boolean isSingleValueAvroFieldType(@Nonnull Schema.Type avroFieldType) {
+  private static boolean isSingleValueAvroFieldType(Schema.Type avroFieldType) {
     return (avroFieldType == Schema.Type.BOOLEAN) || (avroFieldType == Schema.Type.INT) || (avroFieldType
         == Schema.Type.LONG) || (avroFieldType == Schema.Type.FLOAT) || (avroFieldType == Schema.Type.DOUBLE) || (
         avroFieldType == Schema.Type.STRING);
@@ -210,9 +209,7 @@ public class ClusterIntegrationTestUtils {
    * @param nullable Whether the column is nullable
    * @return H2 field name and type
    */
-  @Nonnull
-  private static String buildH2FieldNameAndType(@Nonnull String fieldName, @Nonnull Schema.Type avroFieldType,
-      boolean nullable) {
+  private static String buildH2FieldNameAndType(String fieldName, Schema.Type avroFieldType, boolean nullable) {
     String avroFieldTypeName = avroFieldType.getName();
     String h2FieldType;
     switch (avroFieldTypeName) {
@@ -234,74 +231,68 @@ public class ClusterIntegrationTestUtils {
   }
 
   /**
-   * Builds segments from the given Avro files. Each segment will be built using a separate thread.
-   *  @param avroFiles List of Avro files
+   * Builds Pinot segments from the given Avro files. Each segment will be built using a separate thread.
+   *
+   * @param avroFiles List of Avro files
+   * @param tableConfig Pinot table config
+   * @param schema Pinot schema
    * @param baseSegmentIndex Base segment index number
    * @param segmentDir Output directory for the un-tarred segments
    * @param tarDir Output directory for the tarred segments
-   * @param tableName Table name
-   * @param starTreeV2BuilderConfigs List of star-tree V2 builder configs
-   * @param rawIndexColumns Columns to create raw index with
-   * @param pinotSchema Pinot schema
-   * @param executor Executor
    */
-  public static void buildSegmentsFromAvro(List<File> avroFiles, int baseSegmentIndex, File segmentDir, File tarDir,
-      String tableName, @Nullable List<StarTreeV2BuilderConfig> starTreeV2BuilderConfigs,
-      @Nullable List<String> rawIndexColumns, @Nullable org.apache.pinot.spi.data.Schema pinotSchema,
-      Executor executor) {
-    int numSegments = avroFiles.size();
-    for (int i = 0; i < numSegments; i++) {
-      final File avroFile = avroFiles.get(i);
-      final int segmentIndex = i + baseSegmentIndex;
-      executor.execute(() -> {
-        try {
-          File outputDir = new File(segmentDir, "segment-" + segmentIndex);
-          SegmentGeneratorConfig segmentGeneratorConfig =
-              SegmentTestUtils.getSegmentGeneratorConfig(avroFile, outputDir, TimeUnit.DAYS, tableName, pinotSchema);
-
-          // Test segment with space and special character in the file name
-          segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
-
-          if (starTreeV2BuilderConfigs != null) {
-            segmentGeneratorConfig.setStarTreeV2BuilderConfigs(starTreeV2BuilderConfigs);
-          }
-
-          if (rawIndexColumns != null) {
-            segmentGeneratorConfig.setRawIndexCreationColumns(rawIndexColumns);
-          }
-
-          // Build the segment
-          SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-          driver.init(segmentGeneratorConfig);
-          driver.build();
-
-          // Tar the segment
-          File[] files = outputDir.listFiles();
-          Assert.assertNotNull(files);
-          File segmentFile = files[0];
-          String segmentName = segmentFile.getName();
-          TarGzCompressionUtils
-              .createTarGzOfDirectory(segmentFile.getAbsolutePath(), new File(tarDir, segmentName).getAbsolutePath());
-        } catch (Exception e) {
-          // Ignored
-        }
-      });
+  public static void buildSegmentsFromAvro(List<File> avroFiles, TableConfig tableConfig,
+      org.apache.pinot.spi.data.Schema schema, int baseSegmentIndex, File segmentDir, File tarDir)
+      throws Exception {
+    int numAvroFiles = avroFiles.size();
+    if (numAvroFiles == 1) {
+      buildSegmentFromAvro(avroFiles.get(0), tableConfig, schema, baseSegmentIndex, segmentDir, tarDir);
+    } else {
+      ExecutorService executorService = Executors.newFixedThreadPool(numAvroFiles);
+      List<Future<Void>> futures = new ArrayList<>(numAvroFiles);
+      for (int i = 0; i < numAvroFiles; i++) {
+        File avroFile = avroFiles.get(i);
+        int segmentIndex = i + baseSegmentIndex;
+        futures.add(executorService.submit(() -> {
+          buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex, segmentDir, tarDir);
+          return null;
+        }));
+      }
+      executorService.shutdown();
+      for (Future<Void> future : futures) {
+        future.get();
+      }
     }
   }
 
   /**
-   * Builds segments from the given Avro files. Each segment will be built using a separate thread.
+   * Builds one Pinot segment from the given Avro file.
    *
-   * @param avroFiles List of Avro files
-   * @param baseSegmentIndex Base segment index number
+   * @param avroFile Avro file
+   * @param tableConfig Pinot table config
+   * @param schema Pinot schema
+   * @param segmentIndex Segment index number
    * @param segmentDir Output directory for the un-tarred segments
    * @param tarDir Output directory for the tarred segments
-   * @param tableName Table name
-   * @param executor Executor
    */
-  public static void buildSegmentsFromAvro(List<File> avroFiles, int baseSegmentIndex, File segmentDir, File tarDir,
-      String tableName, Executor executor) {
-    buildSegmentsFromAvro(avroFiles, baseSegmentIndex, segmentDir, tarDir, tableName, null, null, null, executor);
+  public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
+      org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir)
+      throws Exception {
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
+    segmentGeneratorConfig.setOutDir(segmentDir.getPath());
+    segmentGeneratorConfig.setTableName(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
+    // Test segment with space and special character in the file name
+    segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
+
+    // Build the segment
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    // Tar the segment
+    String segmentName = driver.getSegmentName();
+    File indexDir = new File(segmentDir, driver.getSegmentName());
+    TarGzCompressionUtils.createTarGzOfDirectory(indexDir.getPath(), new File(tarDir, segmentName).getPath());
   }
 
   /**
@@ -315,9 +306,8 @@ public class ClusterIntegrationTestUtils {
    * @param partitionColumn Optional partition column
    * @throws Exception
    */
-  public static void pushAvroIntoKafka(@Nonnull List<File> avroFiles, @Nonnull String kafkaBroker,
-      @Nonnull String kafkaTopic, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
-      @Nullable String partitionColumn)
+  public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, String kafkaTopic,
+      int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn)
       throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", kafkaBroker);
@@ -364,8 +354,8 @@ public class ClusterIntegrationTestUtils {
    * @throws Exception
    */
   @SuppressWarnings("unused")
-  public static void pushRandomAvroIntoKafka(@Nonnull File avroFile, @Nonnull String kafkaBroker,
-      @Nonnull String kafkaTopic, int numKafkaMessagesToPush, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
+  public static void pushRandomAvroIntoKafka(File avroFile, String kafkaBroker, String kafkaTopic,
+      int numKafkaMessagesToPush, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
       @Nullable String partitionColumn)
       throws Exception {
     Properties properties = new Properties();
@@ -444,8 +434,7 @@ public class ClusterIntegrationTestUtils {
    * @param fieldType Field type
    * @return Random value for the given field type
    */
-  @Nonnull
-  private static Object generateRandomValue(@Nonnull Schema.Type fieldType) {
+  private static Object generateRandomValue(Schema.Type fieldType) {
     switch (fieldType) {
       case BOOLEAN:
         return RANDOM.nextBoolean();
@@ -480,8 +469,8 @@ public class ClusterIntegrationTestUtils {
    * @param h2Connection H2 connection
    * @throws Exception
    */
-  public static void testQuery(@Nonnull String pinotQuery, @Nonnull String queryFormat, @Nonnull String brokerUrl,
-      @Nonnull org.apache.pinot.client.Connection pinotConnection, @Nullable List<String> sqlQueries,
+  public static void testQuery(String pinotQuery, String queryFormat, String brokerUrl,
+      org.apache.pinot.client.Connection pinotConnection, @Nullable List<String> sqlQueries,
       @Nullable Connection h2Connection)
       throws Exception {
     // Use broker response for metadata check, connection response for value check
@@ -1000,8 +989,8 @@ public class ClusterIntegrationTestUtils {
    * @param failureMessage Failure message
    * @param e Exception
    */
-  private static void failure(@Nonnull String pqlQuery, @Nullable List<String> sqlQueries,
-      @Nonnull String failureMessage, @Nullable Exception e) {
+  private static void failure(String pqlQuery, @Nullable List<String> sqlQueries, String failureMessage,
+      @Nullable Exception e) {
     failureMessage += "\nPQL: " + pqlQuery;
     if (sqlQueries != null) {
       failureMessage += "\nSQL: " + sqlQueries;
@@ -1020,8 +1009,7 @@ public class ClusterIntegrationTestUtils {
    * @param sqlQueries H2 SQL queries
    * @param failureMessage Failure message
    */
-  private static void failure(@Nonnull String pqlQuery, @Nullable List<String> sqlQueries,
-      @Nonnull String failureMessage) {
+  private static void failure(String pqlQuery, @Nullable List<String> sqlQueries, String failureMessage) {
     failure(pqlQuery, sqlQueries, failureMessage, null);
   }
 
@@ -1034,8 +1022,7 @@ public class ClusterIntegrationTestUtils {
    * @param value raw value.
    * @return converted value.
    */
-  @Nonnull
-  private static String convertBooleanToLowerCase(@Nonnull String value) {
+  private static String convertBooleanToLowerCase(String value) {
     if (value.equals("TRUE")) {
       return "true";
     }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 1f221ea..d4f7494 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -23,11 +23,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -52,35 +50,23 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.minion.MinionStarter;
 import org.apache.pinot.minion.events.MinionEventObserverFactory;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig;
 import org.apache.pinot.server.starter.helix.HelixServerStarter;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
@@ -96,17 +82,14 @@ public abstract class ClusterTest extends ControllerTest {
   private List<HelixServerStarter> _serverStarters;
   private MinionStarter _minionStarter;
 
-  // TODO: clean this up
-  private TableConfig _realtimeTableConfig;
-
   protected void startBroker()
       throws Exception {
     startBrokers(1);
   }
 
-  protected void startBroker(int basePort, String zkStr)
+  protected void startBroker(int port, String zkStr)
       throws Exception {
-    startBrokers(1, basePort, zkStr);
+    startBrokers(1, port, zkStr);
   }
 
   protected void startBrokers(int numBrokers)
@@ -245,102 +228,42 @@ public abstract class ClusterTest extends ControllerTest {
     _minionStarter = null;
   }
 
-  protected void addSchema(File schemaFile, String schemaName)
-      throws Exception {
-    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      fileUploadDownloadClient
-          .addSchema(FileUploadDownloadClient.getUploadSchemaHttpURI(LOCAL_HOST, _controllerPort), schemaName,
-              schemaFile);
-    }
-  }
-
   /**
    * Upload all segments inside the given directory to the cluster.
    *
-   * @param segmentDir Segment directory
+   * @param tarDir Segment directory
    */
-  protected void uploadSegments(String tableName, File segmentDir)
+  protected void uploadSegments(String tableName, File tarDir)
       throws Exception {
-    String[] segmentNames = segmentDir.list();
-    assertNotNull(segmentNames);
+    File[] segmentTarFiles = tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+
+    URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      final URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
-
-      // Upload all segments in parallel
-      int numSegments = segmentNames.length;
-      ExecutorService executor = Executors.newFixedThreadPool(numSegments);
-      List<Future<Integer>> tasks = new ArrayList<>(numSegments);
-      for (final String segmentName : segmentNames) {
-        final File segmentFile = new File(segmentDir, segmentName);
-        tasks.add(executor.submit(new Callable<Integer>() {
-          @Override
-          public Integer call()
-              throws Exception {
-            return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentName, segmentFile, tableName)
-                .getStatusCode();
-          }
-        }));
-      }
-      for (Future<Integer> task : tasks) {
-        assertEquals((int) task.get(), HttpStatus.SC_OK);
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles[0];
+        assertEquals(fileUploadDownloadClient
+                .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+            HttpStatus.SC_OK);
+      } else {
+        // Upload all segments in parallel
+        ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(() -> fileUploadDownloadClient
+              .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
+              .getStatusCode()));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
       }
-      executor.shutdown();
     }
   }
 
-  protected void addOfflineTable(String tableName)
-      throws Exception {
-    addOfflineTable(tableName, SegmentVersion.v1);
-  }
-
-  protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
-      throws Exception {
-    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null, null);
-  }
-
-  protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
-      String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
-      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
-      throws Exception {
-    TableConfig tableConfig =
-        getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
-            sortedColumn, "daily");
-    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
-  }
-
-  protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
-      String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
-      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
-      throws Exception {
-    TableConfig tableConfig =
-        getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
-            sortedColumn, "daily");
-    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonString());
-  }
-
-  private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
-      String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
-      TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig, String sortedColumn,
-      String segmentPushFrequency) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
-        .setTimeType(timeType).setSegmentPushFrequency(segmentPushFrequency).setNumReplicas(1)
-        .setBrokerTenant(brokerTenant).setServerTenant(serverTenant).setLoadMode(loadMode)
-        .setSegmentVersion(segmentVersion.toString()).setInvertedIndexColumns(invertedIndexColumns)
-        .setBloomFilterColumns(bloomFilterColumns).setRangeIndexColumns(rangeIndexColumns).setTaskConfig(taskConfig)
-        .setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn).build();
-  }
-
-  protected void dropOfflineTable(String tableName)
-      throws Exception {
-    sendDeleteRequest(
-        _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
-  }
-
   public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSchemaKafkaAvroMessageDecoder.class);
     public static File avroFile;
@@ -379,123 +302,6 @@ public abstract class ClusterTest extends ControllerTest {
     }
   }
 
-  protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
-      String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
-      String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
-      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName)
-      throws Exception {
-    addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
-        timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
-  }
-
-  protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
-      String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
-      String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
-      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
-      throws Exception {
-    addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
-        timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, numReplicas, null);
-  }
-
-  protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
-      String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
-      String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas,
-      List<FieldConfig> fieldConfigListForTextColumns)
-      throws Exception {
-    Map<String, String> streamConfigs = new HashMap<>();
-    String streamType = "kafka";
-    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    if (useLlc) {
-      // LLC
-      streamConfigs
-          .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-              StreamConfig.ConsumerType.LOWLEVEL.toString());
-      streamConfigs.put(KafkaStreamConfigProperties
-          .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST), kafkaBrokerList);
-    } else {
-      // HLC
-      streamConfigs
-          .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-              StreamConfig.ConsumerType.HIGHLEVEL.toString());
-      streamConfigs.put(KafkaStreamConfigProperties
-              .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
-          kafkaZkUrl);
-      streamConfigs.put(KafkaStreamConfigProperties
-              .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER),
-          kafkaBrokerList);
-    }
-    streamConfigs.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        streamConsumerFactoryName);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            kafkaTopic);
-    AvroFileSchemaKafkaAvroMessageDecoder.avroFile = avroFile;
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            AvroFileSchemaKafkaAvroMessageDecoder.class.getName());
-    streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(realtimeSegmentFlushRows));
-    streamConfigs.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
-
-    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setLLC(useLlc)
-        .setTimeColumnName(timeColumnName).setTimeType(timeType).setSchemaName(schemaName).setBrokerTenant(brokerTenant)
-        .setServerTenant(serverTenant).setLoadMode(loadMode).setSortedColumn(sortedColumn)
-        .setInvertedIndexColumns(invertedIndexColumns).setBloomFilterColumns(bloomFilterColumns)
-        .setNoDictionaryColumns(noDictionaryColumns).setStreamConfigs(streamConfigs).setTaskConfig(taskConfig)
-        .setNumReplicas(numReplicas).setFieldConfigList(fieldConfigListForTextColumns).build();
-
-    // save the realtime table config
-    _realtimeTableConfig = tableConfig;
-
-    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
-  }
-
-  protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols,
-      List<String> bloomFilterCols)
-      throws Exception {
-
-    IndexingConfig config = _realtimeTableConfig.getIndexingConfig();
-    config.setInvertedIndexColumns(invertedIndexCols);
-    config.setBloomFilterColumns(bloomFilterCols);
-
-    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJsonString());
-  }
-
-  protected void updateRealtimeTableTenant(String tableName, TenantConfig tenantConfig)
-      throws Exception {
-
-    _realtimeTableConfig.setTenantConfig(tenantConfig);
-
-    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), _realtimeTableConfig.toJsonString());
-  }
-
-  protected void dropRealtimeTable(String tableName)
-      throws Exception {
-    sendDeleteRequest(
-        _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
-  }
-
-  protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
-      String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
-      String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
-      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName,
-      SegmentPartitionConfig segmentPartitionConfig)
-      throws Exception {
-    addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
-        invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig, sortedColumn);
-    addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
-        timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
-  }
-
   protected JsonNode getDebugInfo(final String uri)
       throws Exception {
     return JsonUtils.stringToJsonNode(sendGetRequest(_brokerBaseApiUrl + "/" + uri));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
new file mode 100644
index 0000000..86bc3a1
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ValidationMetrics;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test for all {@link org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}s.
+ * The intention of these tests is not to test functionality of daemons, but simply to check that they run as expected
+ * and process the tables when the controller starts.
+ */
+public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30;
+  private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5;
+  private static final String PERIODIC_TASK_FREQUENCY = "5s";
+
+  // Set initial delay of 30 seconds for periodic tasks, to allow time for tables setup.
+  // Run at 5 seconds frequency in order to keep them running, in case first run happens before table setup.
+  private static class TestControllerConf extends ControllerConf {
+    private TestControllerConf(ControllerConf controllerConf) {
+      copy(controllerConf);
+    }
+
+    @Override
+    public long getStatusCheckerInitialDelayInSeconds() {
+      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+    }
+
+    @Override
+    public int getStatusCheckerFrequencyInSeconds() {
+      return PERIODIC_TASK_FREQUENCY_SECONDS;
+    }
+
+    @Override
+    public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
+      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+    }
+
+    @Override
+    public String getRealtimeSegmentRelocatorFrequency() {
+      return PERIODIC_TASK_FREQUENCY;
+    }
+
+    @Override
+    public long getBrokerResourceValidationInitialDelayInSeconds() {
+      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+    }
+
+    @Override
+    public int getBrokerResourceValidationFrequencyInSeconds() {
+      return PERIODIC_TASK_FREQUENCY_SECONDS;
+    }
+
+    @Override
+    public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
+      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
+    }
+
+    @Override
+    public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+      return PERIODIC_TASK_FREQUENCY_SECONDS;
+    }
+  }
+
+  private static final int NUM_REPLICAS = 3;
+  private static final String TENANT_NAME = "TestTenant";
+
+  private String _currentTable = DEFAULT_TABLE_NAME;
+
+  @Override
+  protected String getTableName() {
+    return _currentTable;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected int getNumReplicas() {
+    return NUM_REPLICAS;
+  }
+
+  @Override
+  protected String getBrokerTenant() {
+    return TENANT_NAME;
+  }
+
+  @Override
+  protected String getServerTenant() {
+    return TENANT_NAME;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    startZk();
+    startKafka();
+
+    TestControllerConf controllerConf = new TestControllerConf(getDefaultControllerConfiguration());
+    controllerConf.setTenantIsolationEnabled(false);
+    startController(controllerConf);
+    startBroker();
+    startServers(6);
+
+    // Create tenants
+    createBrokerTenant(TENANT_NAME, 1);
+    createServerTenant(TENANT_NAME, 3, 3);
+
+    // Unpack the Avro files
+    int numAvroFiles = unpackAvroData(_tempDir).size();
+    int numOfflineAvroFiles = 8;
+    int numRealtimeAvroFiles = 6;
+    // Avro files has to be ordered as time series data
+    List<File> avroFiles = new ArrayList<>(numAvroFiles);
+    for (int i = 1; i <= numAvroFiles; i++) {
+      avroFiles.add(new File(_tempDir, "On_Time_On_Time_Performance_2014_" + i + ".avro"));
+    }
+    List<File> offlineAvroFiles = avroFiles.subList(0, numOfflineAvroFiles);
+    List<File> realtimeAvroFiles = avroFiles.subList(numAvroFiles - numRealtimeAvroFiles, numAvroFiles);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+    addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(realtimeAvroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    String tableName = getTableName();
+    dropOfflineTable(tableName);
+    dropRealtimeTable(tableName);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Test
+  public void testSegmentStatusChecker()
+      throws Exception {
+    String emptyTable = "emptyTable";
+    String disabledTable = "disabledTable";
+    String tableWithOfflineSegment = "tableWithOfflineSegment";
+
+    _currentTable = emptyTable;
+    addTableConfig(createOfflineTableConfig());
+
+    _currentTable = disabledTable;
+    addTableConfig(createOfflineTableConfig());
+    _helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false);
+
+    _currentTable = tableWithOfflineSegment;
+    addTableConfig(createOfflineTableConfig());
+    uploadSegments(_currentTable, _tarDir);
+    // Turn one replica of a segment OFFLINE
+    HelixHelper.updateIdealState(_helixManager, TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment),
+        idealState -> {
+          assertNotNull(idealState);
+          Map<String, String> instanceStateMap = idealState.getRecord().getMapFields().values().iterator().next();
+          instanceStateMap.entrySet().iterator().next().setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+          return idealState;
+        }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+    _currentTable = DEFAULT_TABLE_NAME;
+
+    int numTables = 5;
+    ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
+    TestUtils.waitForCondition(aVoid -> {
+      if (controllerMetrics
+          .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker")
+          != numTables) {
+        return false;
+      }
+      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
+          null, 3, 100, 0, 100)) {
+        return false;
+      }
+      if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
+          TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, Long.MIN_VALUE, Long.MIN_VALUE,
+          Long.MIN_VALUE, Long.MIN_VALUE)) {
+        return false;
+      }
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+      IdealState idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 3, 100, 0, 100)) {
+        return false;
+      }
+      tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment);
+      idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 2, 66, 0, 100)) {
+        return false;
+      }
+      tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+      idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
+      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, 3, 100, 0, 100)) {
+        return false;
+      }
+      return controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT) == 4
+          && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT) == 1
+          && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT) == 1;
+    }, 60_000, "Timed out waiting for SegmentStatusChecker");
+
+    dropOfflineTable(emptyTable);
+    dropOfflineTable(disabledTable);
+    dropOfflineTable(tableWithOfflineSegment);
+  }
+
+  private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType,
+      IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState,
+      long expectedPercentSegmentsAvailable) {
+    if (idealState != null) {
+      if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE) != idealState
+          .toString().length()) {
+        return false;
+      }
+      if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT) != idealState
+          .getPartitionSet().size()) {
+        return false;
+      }
+    }
+    return controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS)
+        == expectedNumReplicas
+        && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS)
+        == expectedPercentReplicas
+        && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE)
+        == expectedSegmentsInErrorState
+        && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE)
+        == expectedPercentSegmentsAvailable;
+  }
+
+  @Test
+  public void testRealtimeSegmentRelocator()
+      throws Exception {
+    // Add relocation tenant config
+    TableConfig realtimeTableConfig = getRealtimeTableConfig();
+    realtimeTableConfig.setTenantConfig(new TenantConfig(TENANT_NAME, TENANT_NAME,
+        new TagOverrideConfig(TENANT_NAME + "_REALTIME", TENANT_NAME + "_OFFLINE")));
+    updateTableConfig(realtimeTableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      // Check servers for ONLINE segment and CONSUMING segments are disjoint sets
+      Set<String> consumingServers = new HashSet<>();
+      Set<String> completedServers = new HashSet<>();
+      IdealState idealState =
+          _helixResourceManager.getTableIdealState(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+      assertNotNull(idealState);
+      for (Map<String, String> instanceStateMap : idealState.getRecord().getMapFields().values()) {
+        for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+          String state = entry.getValue();
+          if (state.equals(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+            consumingServers.add(entry.getKey());
+          } else if (state.equals(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+            completedServers.add(entry.getKey());
+          }
+        }
+      }
+      return Collections.disjoint(consumingServers, completedServers);
+    }, 60_000, "Timed out waiting for RealtimeSegmentRelocation");
+  }
+
+  @Test
+  public void testBrokerResourceValidationManager() {
+    // Add a new broker with the same tag
+    String brokerId = "Broker_localhost_1234";
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+    instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+    String helixClusterName = getHelixClusterName();
+    _helixAdmin.addInstance(helixClusterName, instanceConfig);
+    Set<String> brokersAfterAdd = _helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+    assertTrue(brokersAfterAdd.contains(brokerId));
+
+    String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+    TestUtils.waitForCondition(aVoid -> {
+      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName);
+      assertNotNull(idealState);
+      return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterAdd);
+    }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+
+    // Drop the new added broker
+    _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+    Set<String> brokersAfterDrop = _helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+    assertFalse(brokersAfterDrop.contains(brokerId));
+
+    TestUtils.waitForCondition(input -> {
+      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName);
+      assertNotNull(idealState);
+      return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterDrop);
+    }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+  }
+
+  @Test
+  public void testOfflineSegmentIntervalChecker() {
+    OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
+    ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
+    String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    // Wait until OfflineSegmentIntervalChecker gets executed
+    TestUtils.waitForCondition(aVoid -> {
+      long numSegments =
+          validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount"));
+      long numMissingSegments =
+          validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "missingSegmentCount"));
+      long numTotalDocs =
+          validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "TotalDocumentCount"));
+      return numSegments == 8 && numMissingSegments == 0 && numTotalDocs == 79003;
+    }, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+  }
+
+  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, RealtimeSegmentValidationManager)
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
deleted file mode 100644
index 6050314..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.metrics.ValidationMetrics;
-import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
-import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.TagOverrideConfig;
-import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.ITestContext;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterGroups;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeGroups;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration tests for all {@link org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}
- * The intention of these tests is not to test functionality of daemons,
- * but simply to check that they run as expected and process the tables when the controller starts.
- *
- * Cluster setup/teardown is common across all tests in the @BeforeClass method {@link ControllerPeriodicTasksIntegrationTests::setup}.
- * This includes:
- * zk, controller, 1 broker, 3 offline servers, 3 realtime servers, kafka with avro loaded, offline table with segments from avro
- *
- * There will be a separate beforeTask(), testTask() and afterTask() for each ControllerPeriodicTask test, grouped by task name.
- * See group = "segmentStatusChecker" for example.
- * The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
- *
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation ->
- * brokerResourceValidationManager -> OfflineSegmentIntervalChecker ....
- */
-public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
-
-  // Controller configuration used in this class.
-  private class TestControllerConf extends ControllerConf {
-    private TestControllerConf(ControllerConf controllerConf) {
-      copy(controllerConf);
-    }
-
-    @Override
-    public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
-      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
-    }
-
-    public long getStatusCheckerInitialDelayInSeconds() {
-      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
-    }
-
-    public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
-      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
-    }
-
-    public long getBrokerResourceValidationInitialDelayInSeconds() {
-      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
-    }
-
-    public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
-      return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
-    }
-  }
-
-  private static final String TENANT_NAME = "TestTenant";
-  private static final String DEFAULT_TABLE_NAME = "mytable";
-
-  private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 60;
-  private static final int PERIODIC_TASK_FREQ_SECONDS = 5;
-  private static final String PERIODIC_TASK_FREQ = "5s";
-
-  private String _currentTableName;
-  private List<File> _avroFiles;
-
-  /**
-   * Setup the cluster for the tests
-   * @throws Exception
-   */
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    startZk();
-    startKafka();
-
-    // Set initial delay of 60 seconds for periodic tasks, to allow time for tables setup.
-    // Run at 5 seconds freq in order to keep them running, in case first run happens before table setup
-    TestControllerConf controllerConf = new TestControllerConf(getDefaultControllerConfiguration());
-    controllerConf.setTenantIsolationEnabled(false);
-    controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
-    controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
-    controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
-    controllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
-    controllerConf.setRealtimeSegmentValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
-
-    startController(controllerConf);
-    startBroker();
-    startServers(6);
-
-    // Create tenants
-    createBrokerTenant(TENANT_NAME, 1);
-    createServerTenant(TENANT_NAME, 3, 3);
-
-    // unpack avro into _tempDir
-    _avroFiles = unpackAvroData(_tempDir);
-
-    // setup a default offline table, shared across all tests. Each test can create additional tables and destroy them
-    setupOfflineTableAndSegments(DEFAULT_TABLE_NAME, _avroFiles);
-
-    // push avro into kafka, each test can create the realtime table and destroy it
-    ExecutorService executor = Executors.newCachedThreadPool();
-    pushAvroIntoKafka(_avroFiles, getKafkaTopic(), executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-  }
-
-  /**
-   * Setup offline table, but no segments
-   */
-  private void setupOfflineTable(String table)
-      throws Exception {
-    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null,
-        null);
-  }
-
-  /**
-   * Setup offline table, with segments from avro
-   */
-  private void setupOfflineTableAndSegments(String tableName, List<File> avroFiles)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-    setTableName(tableName);
-
-    File schemaFile = getSchemaFile();
-    Schema schema = Schema.fromFile(schemaFile);
-    String schemaName = schema.getSchemaName();
-    addSchema(schemaFile, schemaName);
-
-    String timeColumnName = getTimeColumnName();
-    FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-    Assert.assertNotNull(fieldSpec);
-    TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-    TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-    Assert.assertNotNull(outgoingTimeUnit);
-    String timeType = outgoingTimeUnit.toString();
-
-    addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null,
-        null, null, null, null);
-
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, null, null, executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-    uploadSegments(getTableName(), _tarDir);
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  /**
-   * Setup realtime table for given tablename and topic
-   */
-  private void setupRealtimeTable(String table, String topic, File avroFile)
-      throws Exception {
-    File schemaFile = getSchemaFile();
-    Schema schema = Schema.fromFile(schemaFile);
-    String schemaName = schema.getSchemaName();
-    addSchema(schemaFile, schemaName);
-
-    String timeColumnName = getTimeColumnName();
-    FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-    Assert.assertNotNull(fieldSpec);
-    TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-    TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-    Assert.assertNotNull(outgoingTimeUnit);
-    String timeType = outgoingTimeUnit.toString();
-
-    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic,
-        getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME,
-        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
-        getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
-  }
-
-  @Override
-  public String getTableName() {
-    return _currentTableName;
-  }
-
-  private void setTableName(String tableName) {
-    _currentTableName = tableName;
-  }
-
-  /**
-   * Group - segmentStatusChecker - Integration tests for {@link org.apache.pinot.controller.helix.SegmentStatusChecker}
-   * @throws Exception
-   */
-  @BeforeGroups(groups = "segmentStatusChecker")
-  public void beforeTestSegmentStatusCheckerTest(ITestContext context)
-      throws Exception {
-    String emptyTable = "table1_OFFLINE";
-    String disabledOfflineTable = "table2_OFFLINE";
-    String basicOfflineTable = getDefaultOfflineTableName();
-    String errorOfflineTable = "table4_OFFLINE";
-    String basicRealtimeTable = getDefaultRealtimeTableName();
-    int numTables = 5;
-
-    context.setAttribute("emptyTable", emptyTable);
-    context.setAttribute("disabledOfflineTable", disabledOfflineTable);
-    context.setAttribute("basicOfflineTable", basicOfflineTable);
-    context.setAttribute("errorOfflineTable", errorOfflineTable);
-    context.setAttribute("basicRealtimeTable", basicRealtimeTable);
-    context.setAttribute("numTables", numTables);
-
-    // empty table
-    setupOfflineTable(emptyTable);
-
-    // table with disabled ideal state
-    setupOfflineTable(disabledOfflineTable);
-    _helixAdmin.enableResource(getHelixClusterName(), disabledOfflineTable, false);
-
-    // some segments offline
-    setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
-    HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new Function<IdealState, IdealState>() {
-      @Nullable
-      @Override
-      public IdealState apply(@Nullable IdealState input) {
-        List<String> segmentNames = Lists.newArrayList(input.getPartitionSet());
-        Collections.sort(segmentNames);
-
-        Map<String, String> instanceStateMap1 = input.getInstanceStateMap(segmentNames.get(0));
-        for (String instance : instanceStateMap1.keySet()) {
-          instanceStateMap1.put(instance, "OFFLINE");
-          break;
-        }
-        return input;
-      }
-    }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
-
-    // setup default realtime table
-    setupRealtimeTable(basicRealtimeTable, getKafkaTopic(), _avroFiles.get(0));
-  }
-
-  /**
-   * After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
-   * Validate that we are seeing the expected numbers
-   */
-  @Test(groups = "segmentStatusChecker")
-  public void testSegmentStatusChecker(ITestContext context)
-      throws Exception {
-    String emptyTable = (String) context.getAttribute("emptyTable");
-    String disabledOfflineTable = (String) context.getAttribute("disabledOfflineTable");
-    String basicOfflineTable = (String) context.getAttribute("basicOfflineTable");
-    String errorOfflineTable = (String) context.getAttribute("errorOfflineTable");
-    String basicRealtimeTable = (String) context.getAttribute("basicRealtimeTable");
-    int numTables = (int) context.getAttribute("numTables");
-
-    ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
-
-    TestUtils.waitForCondition(input -> controllerMetrics
-            .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker") >= numTables,
-        240_000, "Timed out waiting for SegmentStatusChecker");
-
-    // empty table - table1_OFFLINE
-    // num replicas set from ideal state
-    checkSegmentStatusCheckerMetrics(controllerMetrics, emptyTable, null, 3, 100, 0, 100);
-
-    // disabled table - table2_OFFLINE
-    // reset to defaults
-    checkSegmentStatusCheckerMetrics(controllerMetrics, disabledOfflineTable, null, Long.MIN_VALUE, Long.MIN_VALUE,
-        Long.MIN_VALUE, Long.MIN_VALUE);
-
-    // happy path table - mytable_OFFLINE
-    IdealState idealState = _helixResourceManager.getTableIdealState(basicOfflineTable);
-    checkSegmentStatusCheckerMetrics(controllerMetrics, basicOfflineTable, idealState, 3, 100, 0, 100);
-
-    // offline segments - table4_OFFLINE
-    // 2 replicas available out of 3, percent 66
-    idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
-    checkSegmentStatusCheckerMetrics(controllerMetrics, errorOfflineTable, idealState, 2, 66, 0, 100);
-
-    // happy path table - mytable_REALTIME
-    idealState = _helixResourceManager.getTableIdealState(basicRealtimeTable);
-    checkSegmentStatusCheckerMetrics(controllerMetrics, basicRealtimeTable, idealState, 1, 100, 0, 100);
-
-    // Total metrics
-    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT), 4);
-    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT), 1);
-    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1);
-  }
-
-  private void checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableName,
-      IdealState idealState, long numReplicas, long percentReplicas, long segmentsInErrorState,
-      long percentSegmentsAvailable) {
-    if (idealState != null) {
-      Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
-          idealState.toString().length());
-      Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENT_COUNT),
-          idealState.getPartitionSet().size());
-    }
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        numReplicas);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS),
-        percentReplicas);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        segmentsInErrorState);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
-        percentSegmentsAvailable);
-  }
-
-  @AfterGroups(groups = "segmentStatusChecker")
-  public void afterTestSegmentStatusChecker(ITestContext context)
-      throws Exception {
-    String emptyTable = (String) context.getAttribute("emptyTable");
-    String disabledOfflineTable = (String) context.getAttribute("disabledOfflineTable");
-    String errorOfflineTable = (String) context.getAttribute("errorOfflineTable");
-    String basicRealtimeTable = (String) context.getAttribute("basicRealtimeTable");
-
-    dropOfflineTable(emptyTable);
-    dropOfflineTable(disabledOfflineTable);
-    dropOfflineTable(errorOfflineTable);
-  }
-
-  /**
-   * Group - realtimeSegmentRelocator - Integration tests for {@link org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator}
-   * @param context
-   * @throws Exception
-   */
-  @BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups = "segmentStatusChecker")
-  public void beforeRealtimeSegmentRelocatorTest(ITestContext context)
-      throws Exception {
-    String relocationTable = getDefaultRealtimeTableName();
-    context.setAttribute("relocationTable", relocationTable);
-
-    // add tag override for relocation
-    TenantConfig tenantConfig = new TenantConfig(TENANT_NAME, TENANT_NAME,
-        new TagOverrideConfig(TENANT_NAME + "_REALTIME", TENANT_NAME + "_OFFLINE"));
-    updateRealtimeTableTenant(TableNameBuilder.extractRawTableName(relocationTable), tenantConfig);
-  }
-
-  @Test(groups = "realtimeSegmentRelocator", dependsOnGroups = "segmentStatusChecker")
-  public void testRealtimeSegmentRelocator(ITestContext context)
-      throws Exception {
-
-    String relocationTable = (String) context.getAttribute("relocationTable");
-
-    ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
-
-    long taskRunCount =
-        controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
-            .count();
-    TestUtils.waitForCondition(input ->
-        controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
-            .count() > taskRunCount, 60_000, "Timed out waiting for RealtimeSegmentRelocation to run");
-
-    Assert.assertTrue(controllerMetrics
-        .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentRelocator") > 0);
-
-    // check servers for ONLINE segment and CONSUMING segments are disjoint sets
-    Set<String> consuming = new HashSet<>();
-    Set<String> completed = new HashSet<>();
-    IdealState tableIdealState = _helixResourceManager.getTableIdealState(relocationTable);
-    for (String partition : tableIdealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = tableIdealState.getInstanceStateMap(partition);
-      if (instanceStateMap.containsValue("CONSUMING")) {
-        consuming.addAll(instanceStateMap.keySet());
-      }
-      if (instanceStateMap.containsValue("ONLINE")) {
-        completed.addAll(instanceStateMap.keySet());
-      }
-    }
-
-    Assert.assertTrue(Collections.disjoint(consuming, completed));
-  }
-
-  @BeforeGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
-  public void beforeBrokerResourceValidationManagerTest(ITestContext context)
-      throws Exception {
-    String table1 = "testTable";
-    String table2 = "testTable2";
-    context.setAttribute("testTableOne", table1);
-    context.setAttribute("testTableTwo", table2);
-    setupOfflineTable(table1);
-  }
-
-  @Test(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
-  public void testBrokerResourceValidationManager(ITestContext context)
-      throws Exception {
-    // Check that the first table we added doesn't need to be rebuilt(case where ideal state brokers and brokers in broker resource are the same.
-    String table1 = (String) context.getAttribute("testTableOne");
-    String table2 = (String) context.getAttribute("testTableTwo");
-    TableConfig tableConfigOne = new TableConfigBuilder(TableType.OFFLINE).setTableName(table1).build();
-    String partitionNameOne = tableConfigOne.getTableName();
-
-    // Ensure that the broker resource is not rebuilt.
-    TestUtils.waitForCondition(input -> {
-      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
-      return idealState.getInstanceSet(partitionNameOne)
-          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
-    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
-    // Add another table that needs to be rebuilt
-    TableConfig offlineTableConfigTwo =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(table2).setBrokerTenant(TENANT_NAME)
-            .setServerTenant(TENANT_NAME).build();
-    _helixResourceManager.addTable(offlineTableConfigTwo);
-    String partitionNameTwo = offlineTableConfigTwo.getTableName();
-
-    // Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
-    final String brokerId = "Broker_localhost_2";
-    InstanceConfig instanceConfig = new InstanceConfig(brokerId);
-    instanceConfig.setInstanceEnabled(true);
-    instanceConfig.setHostName("Broker_localhost");
-    instanceConfig.setPort("2");
-    _helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
-    _helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
-        TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
-
-    // Count the number of times we check on ideal state change, which is made by rebuild broker resource method.
-    AtomicInteger count = new AtomicInteger();
-    TestUtils.waitForCondition(input -> {
-      count.getAndIncrement();
-      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
-      return idealState.getInstanceSet(partitionNameTwo)
-          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
-    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
-    // At least the broker resource won't be changed immediately.
-    Assert.assertTrue(count.get() > 1);
-
-    // Drop the instance so that broker resource doesn't match the current one.
-    _helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
-    count.set(0);
-    TestUtils.waitForCondition(input -> {
-      count.getAndIncrement();
-      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
-      return idealState.getInstanceSet(partitionNameTwo)
-          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
-    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
-
-    // At least the broker resource won't be changed immediately.
-    Assert.assertTrue(count.get() > 1);
-  }
-
-  @AfterGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
-  public void afterBrokerResourceValidationManagerTest(ITestContext context)
-      throws Exception {
-    String table1 = (String) context.getAttribute("testTableOne");
-    String table2 = (String) context.getAttribute("testTableTwo");
-    dropOfflineTable(table1);
-    dropOfflineTable(table2);
-  }
-
-  @Test(groups = "offlineSegmentIntervalChecker", dependsOnGroups = "brokerResourceValidationManager")
-  public void testOfflineSegmentIntervalChecker()
-      throws Exception {
-    OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker();
-    ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics();
-
-    String tablNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
-
-    // Wait until OfflineSegmentIntervalChecker gets executed
-    TestUtils.waitForCondition(
-        input -> validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount"))
-            > 0, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
-
-    // Test the validation metrics values updated by OfflineSegmentIntervalChecker against the known values
-    // from segment metadata
-    Assert.assertEquals(
-        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")), 12);
-    Assert.assertEquals(
-        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "missingSegmentCount")), 0);
-    Assert.assertEquals(
-        validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "TotalDocumentCount")),
-        115545);
-  }
-
-  /**
-   * Group - realtimeSegmentValidationManager - Integration tests for {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
-   * @param context
-   * @throws Exception
-   */
-
-  @Test(groups = "realtimeSegmentValidationManager", dependsOnGroups = "offlineSegmentIntervalChecker")
-  public void testRealtimeSegmentValidationManager(ITestContext context)
-      throws Exception {
-    ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
-    long taskRunCount = controllerMetrics
-        .getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
-
-    // Wait until the RealtimeSegmentValidationManager runs at least once. Most likely it already ran once
-    // on the realtime table (default one) already setup, so we should have the total document count on that
-    // realtime table.
-    TestUtils.waitForCondition(input -> controllerMetrics
-        .getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count()
-        > taskRunCount, 60_000, "Timed out waiting for RealtimeSegmentValidationManager to run");
-
-    Assert.assertTrue(controllerMetrics
-        .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentValidationManager")
-        > 0);
-    RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
-    ValidationMetrics validationMetrics = validationManager.getValidationMetrics();
-    // Make sure we processed the realtime table to get the total document count. Should have been done the first
-    // time RealtimeSegmentValidationManager ran on the default realtime table.
-    Assert.assertTrue(validationMetrics
-        .getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(), "TotalDocumentCount")) > 0);
-  }
-
-  // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , RealtimeSegmentValidationManager)
-
-  @Override
-  protected boolean useLlc() {
-    return true;
-  }
-
-  private String getDefaultOfflineTableName() {
-    return DEFAULT_TABLE_NAME + "_OFFLINE";
-  }
-
-  private String getDefaultRealtimeTableName() {
-    return DEFAULT_TABLE_NAME + "_REALTIME";
-  }
-
-  /**
-   * Tear down the cluster after tests
-   * @throws Exception
-   */
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    stopServer();
-    stopBroker();
-    stopController();
-    stopKafka();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index a4d302d..9fa9519 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.ConvertToRawIndexTask;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -56,10 +57,16 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
 
   @Nullable
   @Override
-  protected List<String> getRawIndexColumns() {
+  protected List<String> getNoDictionaryColumns() {
     return null;
   }
 
+  // NOTE: Only allow converting raw index for v1 segment
+  @Override
+  protected String getSegmentVersion() {
+    return SegmentVersion.v1.name();
+  }
+
   @Override
   protected TableTaskConfig getTaskConfig() {
     Map<String, String> convertToRawIndexTaskConfigs = new HashMap<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
deleted file mode 100644
index ea12bc5..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.io.Files;
-import com.yammer.metrics.core.MetricsRegistry;
-import java.io.File;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
-import org.apache.pinot.core.data.manager.realtime.SegmentCommitter;
-import org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory;
-import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.core.data.readers.PinotSegmentUtil;
-import org.apache.pinot.server.realtime.ControllerLeaderLocator;
-import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Integration test that extends RealtimeClusterIntegrationTest to test default commit but uses low-level Kafka consumer.
- */
-public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterIntegrationTest {
-  private File _indexDir;
-  private File _realtimeSegmentUntarred;
-
-  private static final String TARGZ_SUFFIX = ".tar.gz";
-  private static final long END_OFFSET = 500L;
-  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
-  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCommitterRealtimeIntegrationTest.class);
-
-  @BeforeClass
-  @Override
-  public void setUp()
-      throws Exception {
-    // Remove the consumer directory
-    File consumerDirectory = new File(CONSUMER_DIRECTORY);
-    if (consumerDirectory.exists()) {
-      FileUtils.deleteDirectory(consumerDirectory);
-    }
-
-    startZk();
-    startController();
-    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
-    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
-
-    ControllerLeaderLocator.create(_helixManager);
-
-    // Start Kafka
-    startKafka();
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
-    setUpRealtimeTable(avroFiles.get(0));
-    buildSegment();
-  }
-
-  @Override
-  protected boolean useLlc() {
-    return true;
-  }
-
-  @Test
-  public void testDefaultCommitter()
-      throws Exception {
-    ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry());
-    ServerSegmentCompletionProtocolHandler protocolHandler =
-        new ServerSegmentCompletionProtocolHandler(serverMetrics, getTableName());
-
-    LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor =
-        mock(LLRealtimeSegmentDataManager.SegmentBuildDescriptor.class);
-
-    RealtimeSegmentZKMetadata metadata = _helixResourceManager.getRealtimeSegmentMetadata(getTableName()).get(0);
-
-    String instanceId = _helixResourceManager.getAllInstances().get(0);
-
-    SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-    params.withSegmentName(metadata.getSegmentName()).withInstanceId(instanceId).withOffset(END_OFFSET);
-
-    _realtimeSegmentUntarred = new File(_indexDir.getParentFile(), metadata.getSegmentName());
-    FileUtils.copyDirectory(_indexDir, _realtimeSegmentUntarred);
-
-    TarGzCompressionUtils.createTarGzOfDirectory(_realtimeSegmentUntarred.getAbsolutePath());
-
-    // SegmentBuildDescriptor is currently not a static class, so we will mock this object.
-    StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(END_OFFSET);
-    when(segmentBuildDescriptor.getSegmentTarFilePath()).thenReturn(_realtimeSegmentUntarred + TARGZ_SUFFIX);
-    when(segmentBuildDescriptor.getBuildTimeMillis()).thenReturn(0L);
-    when(segmentBuildDescriptor.getOffset()).thenReturn(endOffset);
-    when(segmentBuildDescriptor.getSegmentSizeBytes()).thenReturn(0L);
-    when(segmentBuildDescriptor.getWaitTimeMillis()).thenReturn(0L);
-
-    // Get realtime segment name
-    String segmentList =
-        sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), "REALTIME"));
-    JsonNode realtimeSegmentsList = getSegmentsFromJsonSegmentAPI(segmentList, TableType.REALTIME.toString());
-    String segmentName = realtimeSegmentsList.get(0).asText();
-
-    // Send segmentConsumed request
-    sendGetRequest("http://localhost:" + DEFAULT_CONTROLLER_PORT + "/segmentConsumed?instance=" + instanceId + "&name="
-        + segmentName + "&offset=" + END_OFFSET);
-
-    SegmentCommitterFactory segmentCommitterFactory = new SegmentCommitterFactory(LOGGER, protocolHandler);
-    SegmentCommitter segmentCommitter = segmentCommitterFactory.createDefaultSegmentCommitter(params);
-    segmentCommitter.commit(segmentBuildDescriptor);
-  }
-
-  public void buildSegment()
-      throws Exception {
-    Schema schema = _helixResourceManager.getSchema(getTableName());
-    TableConfig tableConfig = _helixResourceManager
-        .getTableConfig(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(getTableName()));
-    String _segmentOutputDir = Files.createTempDir().toString();
-    List<GenericRow> _rows = PinotSegmentUtil.createTestData(schema, 1);
-    RecordReader _recordReader = new GenericRowRecordReader(_rows);
-
-    _indexDir = PinotSegmentUtil.createSegment(tableConfig, schema, "segmentName", _segmentOutputDir, _recordReader);
-  }
-
-  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
-      throws Exception {
-    return JsonUtils.stringToJsonNode(json).get(0).get(type);
-  }
-
-  @Override
-  public void tearDown()
-      throws Exception {
-    super.tearDown();
-    _indexDir.deleteOnExit();
-    _realtimeSegmentUntarred.deleteOnExit();
-  }
-
-  @Override
-  public void testDictionaryBasedQueries()
-      throws Exception {
-  }
-
-  @Override
-  public void testGeneratedQueriesWithMultiValues()
-      throws Exception {
-
-  }
-
-  @Override
-  public void testHardcodedSqlQueries()
-      throws Exception {
-  }
-
-  @Override
-  public void testInstanceShutdown()
-      throws Exception {
-  }
-
-  @Override
-  public void testQueriesFromQueryFile()
-      throws Exception {
-  }
-
-  @Override
-  public void testQueryExceptions()
-      throws Exception {
-  }
-
-  @Override
-  public void testSqlQueriesFromQueryFile()
-      throws Exception {
-  }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index ac3b378..b05244f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.testng.annotations.BeforeClass;
 
 
 /**
@@ -36,13 +35,6 @@ import org.testng.annotations.BeforeClass;
  */
 public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
 
-  @BeforeClass
-  @Override
-  public void setUp()
-      throws Exception {
-    super.setUp();
-  }
-
   @Override
   protected String getStreamConsumerFactoryClassName() {
     return FlakyStreamFactory.class.getName();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index e9e8e47..d434d9d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -23,20 +23,15 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -53,8 +48,6 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final int NUM_OFFLINE_SEGMENTS = 8;
   private static final int NUM_REALTIME_SEGMENTS = 6;
 
-  private Schema _schema;
-
   protected int getNumOfflineSegments() {
     return NUM_OFFLINE_SEGMENTS;
   }
@@ -63,6 +56,21 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return NUM_REALTIME_SEGMENTS;
   }
 
+  @Override
+  protected String getBrokerTenant() {
+    return TENANT_NAME;
+  }
+
+  @Override
+  protected String getServerTenant() {
+    return TENANT_NAME;
+  }
+
+  @Override
+  protected void overrideServerConf(Configuration configuration) {
+    configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -75,32 +83,26 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles);
     List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles);
 
-    ExecutorService executor = Executors.newCachedThreadPool();
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+    addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
 
-    // Create segments from Avro data
-    File schemaFile = getSchemaFile();
-    _schema = Schema.fromFile(schemaFile);
+    // Create and upload segments
     ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(offlineAvroFiles, 0, _segmentDir, _tarDir, getTableName(), null, getRawIndexColumns(),
-            _schema, executor);
-
-    // Push data into the Kafka topic
-    pushAvroIntoKafka(realtimeAvroFiles, getKafkaTopic(), executor);
-
-    // Load data into H2
-    setUpH2Connection(avroFiles, executor);
-
-    // Initialize query generator
-    setUpQueryGenerator(avroFiles, executor);
+        .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
 
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
+    // Push data into Kafka
+    pushAvroIntoKafka(realtimeAvroFiles);
 
-    // Create Pinot table
-    setUpTable(avroFiles.get(0));
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
 
-    // Upload all segments
-    uploadSegments(getTableName(), _tarDir);
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
@@ -124,30 +126,6 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     createServerTenant(TENANT_NAME, 1, 1);
   }
 
-  protected void setUpTable(File avroFile)
-      throws Exception {
-    String schemaName = _schema.getSchemaName();
-    addSchema(getSchemaFile(), schemaName);
-
-    String timeColumnName = getTimeColumnName();
-    FieldSpec fieldSpec = _schema.getFieldSpecFor(timeColumnName);
-    Assert.assertNotNull(fieldSpec);
-    TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-    TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-    Assert.assertNotNull(outgoingTimeUnit);
-    String timeType = outgoingTimeUnit.toString();
-
-    addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
-        getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
-        TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRangeIndexColumns(),
-        getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
-  }
-
-  @Override
-  protected void overrideServerConf(Configuration configuration) {
-    configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
-  }
-
   protected List<File> getAllAvroFiles()
       throws Exception {
     // Unpack the Avro files
@@ -308,7 +286,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
   public void tearDown()
       throws Exception {
     // Try deleting the tables and check that they have no routing table
-    final String tableName = getTableName();
+    String tableName = getTableName();
     dropOfflineTable(tableName);
     dropRealtimeTable(tableName);
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index 8935966..9df57ee 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -28,22 +28,20 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.ITestResult;
@@ -112,8 +110,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
 
     CustomHybridClusterIntegrationTest._tableName = args[argIdx++];
     File schemaFile = new File(args[argIdx++]);
-    Preconditions.checkState(schemaFile.isFile());
-    CustomHybridClusterIntegrationTest._schemaFile = schemaFile;
+    CustomHybridClusterIntegrationTest._schema = Schema.fromFile(schemaFile);
     File dataDir = new File(args[argIdx++]);
     Preconditions.checkState(dataDir.isDirectory());
     CustomHybridClusterIntegrationTest._dataDir = dataDir;
@@ -165,11 +162,11 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private static final String TENANT_NAME = "TestTenant";
     private static final int ZK_PORT = 3191;
     private static final String ZK_STR = "localhost:" + ZK_PORT;
-    private static final String KAFKA_ZK_STR = ZK_STR + "/kafka";
+    private static final int NUM_KAFKA_BROKERS = 1;
     private static final int KAFKA_PORT = 20092;
-    private static final String KAFKA_BROKER = "localhost:" + KAFKA_PORT;
+    private static final String KAFKA_ZK_STR = ZK_STR + "/kafka";
     private static final int CONTROLLER_PORT = 9998;
-    private static final int BROKER_BASE_PORT = 19099;
+    private static final int BROKER_PORT = 19099;
     private static final int SERVER_BASE_ADMIN_API_PORT = 9097;
     private static final int SERVER_BASE_NETTY_PORT = 9098;
 
@@ -181,7 +178,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private static boolean _enabled = false;
     private static boolean _useLlc = false;
     private static String _tableName;
-    private static File _schemaFile;
+    private static Schema _schema;
     private static File _dataDir;
     private static List<String> _invertedIndexColumns;
     private static String _sortedColumn;
@@ -190,7 +187,6 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private List<File> _realtimeAvroFiles;
     private File _queryFile;
     private File _responseFile;
-    private StreamDataServerStartable _kafkaStarter;
     private long _countStarResult;
 
     public CustomHybridClusterIntegrationTest() {
@@ -222,13 +218,24 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       Assert.assertTrue(_responseFile.isFile());
     }
 
-    @Nonnull
     @Override
     protected String getTableName() {
       return _tableName;
     }
 
     @Override
+    protected String getSchemaName() {
+      return _schema.getSchemaName();
+    }
+
+    @Nullable
+    @Override
+    protected String getTimeColumnName() {
+      TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+      return timeFieldSpec != null ? timeFieldSpec.getName() : null;
+    }
+
+    @Override
     protected long getCountStarResult() {
       return _countStarResult;
     }
@@ -244,6 +251,64 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     }
 
     @Override
+    protected int getNumKafkaBrokers() {
+      return NUM_KAFKA_BROKERS;
+    }
+
+    @Override
+    protected int getBaseKafkaPort() {
+      return KAFKA_PORT;
+    }
+
+    @Override
+    protected String getKafkaZKAddress() {
+      return KAFKA_ZK_STR;
+    }
+
+    @Override
+    protected String getSortedColumn() {
+      return _sortedColumn;
+    }
+
+    @Override
+    protected List<String> getInvertedIndexColumns() {
+      return _invertedIndexColumns;
+    }
+
+    @Nullable
+    @Override
+    protected List<String> getNoDictionaryColumns() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    protected List<String> getRangeIndexColumns() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    protected List<String> getBloomFilterColumns() {
+      return null;
+    }
+
+    @Override
+    protected String getLoadMode() {
+      return ReadMode.mmap.name();
+    }
+
+    @Override
+    protected String getBrokerTenant() {
+      return TENANT_NAME;
+    }
+
+    @Override
+    protected String getServerTenant() {
+      return TENANT_NAME;
+    }
+
+    @Override
     protected long getCurrentCountStarResult()
         throws Exception {
       return postQuery("SELECT COUNT(*) FROM " + getTableName()).get("aggregationResults").get(0).get("value").asLong();
@@ -260,11 +325,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
 
       // Start Zk and Kafka
       startZk(ZK_PORT);
-      _kafkaStarter = KafkaStarterUtils.startServer(KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
-          KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-      // Create Kafka topic
-      _kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
+      startKafka();
 
       // Start the Pinot cluster
       ControllerConf config = getDefaultControllerConfiguration();
@@ -272,37 +333,22 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       config.setZkStr(ZK_STR);
       config.setTenantIsolationEnabled(false);
       startController(config);
-      startBroker(BROKER_BASE_PORT, ZK_STR);
+      startBroker(BROKER_PORT, ZK_STR);
       startServers(2, SERVER_BASE_ADMIN_API_PORT, SERVER_BASE_NETTY_PORT, ZK_STR);
 
       // Create tenants
       createBrokerTenant(TENANT_NAME, 1);
       createServerTenant(TENANT_NAME, 1, 1);
 
-      // Create segments from Avro data
-      ExecutorService executor = Executors.newCachedThreadPool();
-      Schema schema = Schema.fromFile(_schemaFile);
+      // Create and upload the schema and table config
+      addSchema(_schema);
+      TableConfig offlineTableConfig = createOfflineTableConfig();
+      addTableConfig(offlineTableConfig);
+      addTableConfig(createRealtimeTableConfig(_realtimeAvroFiles.get(0)));
+
+      // Create and upload segments
       ClusterIntegrationTestUtils
-          .buildSegmentsFromAvro(_offlineAvroFiles, 0, _segmentDir, _tarDir, _tableName, null, getRawIndexColumns(),
-              schema, executor);
-      executor.shutdown();
-      executor.awaitTermination(10, TimeUnit.MINUTES);
-
-      // Create Pinot table
-      String schemaName = schema.getSchemaName();
-      addSchema(_schemaFile, schemaName);
-      String timeColumnName = getTimeColumnName();
-      FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-      Assert.assertNotNull(fieldSpec);
-      TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-      TimeUnit outgoingTimeUnit = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-      Assert.assertNotNull(outgoingTimeUnit);
-      String timeType = outgoingTimeUnit.toString();
-      addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
-          _realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
-          _sortedColumn, _invertedIndexColumns, null, null, null, null, getStreamConsumerFactoryClassName(), null);
-
-      // Upload all segments
+          .buildSegmentsFromAvro(_offlineAvroFiles, offlineTableConfig, _schema, 0, _segmentDir, _tarDir);
       uploadSegments(getTableName(), _tarDir);
     }
 
@@ -320,9 +366,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       try (BufferedReader responseFileReader = new BufferedReader(new FileReader(_responseFile))) {
         for (File realtimeAvroFile : _realtimeAvroFiles) {
           // Push one avro file into the Kafka topic
-          ClusterIntegrationTestUtils
-              .pushAvroIntoKafka(Collections.singletonList(realtimeAvroFile), KAFKA_BROKER, getKafkaTopic(),
-                  getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
+          pushAvroIntoKafka(Collections.singletonList(realtimeAvroFile));
 
           try (BufferedReader queryFileReader = new BufferedReader(new FileReader(_queryFile))) {
             // Set the expected COUNT(*) result and wait for all documents loaded
@@ -344,7 +388,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
                 public void run() {
                   try {
                     JsonNode actualResponse =
-                        postQuery(new PinotQueryRequest("pql", currentQuery), "http://localhost:" + BROKER_BASE_PORT);
+                        postQuery(new PinotQueryRequest("pql", currentQuery), "http://localhost:" + BROKER_PORT);
                     if (QueryComparison.compareWithEmpty(actualResponse, expectedResponse)
                         == QueryComparison.ComparisonStatus.FAILED) {
                       numFailedQueries.getAndIncrement();
@@ -383,7 +427,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       stopServer();
       stopBroker();
       stopController();
-      _kafkaStarter.stop();
+      stopKafka();
       stopZk();
 
       FileUtils.deleteDirectory(_tempDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
index db22049..1bb6018 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,25 +28,19 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -55,128 +48,75 @@ import org.testng.annotations.Test;
 
 
 public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
+  private static final int NUM_TOTAL_DOCS = 1000;
+  private final List<String> sortedSequenceIds = new ArrayList<>(NUM_TOTAL_DOCS);
 
-  protected static final String DEFAULT_TABLE_NAME = "myTable";
-  static final long TOTAL_DOCS = 1_000L;
-  private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathClusterIntegrationTest.class);
-  private final List<String> sortedSequenceIds = new ArrayList<>();
-  protected Schema _schema;
-  private String _currentTable;
-
-  @Nonnull
-  @Override
-  protected String getTableName() {
-    return _currentTable;
-  }
-
-  @Nonnull
   @Override
-  protected String getSchemaFileName() {
-    return "";
+  protected long getCountStarResult() {
+    return NUM_TOTAL_DOCS;
   }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     // Start the Pinot cluster
     startZk();
     startController();
     startBroker();
-    startServers(1);
-
-    _schema = new Schema();
-
-    FieldSpec jsonFieldSpec = new DimensionFieldSpec();
-    jsonFieldSpec.setDataType(DataType.STRING);
-    jsonFieldSpec.setDefaultNullValue("");
-    jsonFieldSpec.setName("myMap");
-    jsonFieldSpec.setSingleValueField(true);
-    _schema.addField(jsonFieldSpec);
-    FieldSpec jsonStrFieldSpec = new DimensionFieldSpec();
-    jsonStrFieldSpec.setDataType(DataType.STRING);
-    jsonStrFieldSpec.setDefaultNullValue("");
-    jsonStrFieldSpec.setName("myMapStr");
-    jsonStrFieldSpec.setSingleValueField(true);
-    _schema.addField(jsonStrFieldSpec);
-    FieldSpec complexJsonStrFieldSpec = new DimensionFieldSpec();
-    complexJsonStrFieldSpec.setDataType(DataType.STRING);
-    complexJsonStrFieldSpec.setDefaultNullValue("");
-    complexJsonStrFieldSpec.setName("complexMapStr");
-    complexJsonStrFieldSpec.setSingleValueField(true);
-    _schema.addField(complexJsonStrFieldSpec);
-
-    // Create the tables
-    ArrayList<String> invertedIndexColumns = Lists.newArrayList();
-    addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null, SegmentVersion.v1, invertedIndexColumns, null,
-        null, null, null, null);
-
-    setUpSegmentsAndQueryGenerator();
+    startServer();
+
+    // Create and upload the schema and table config
+    String rawTableName = getTableName();
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(rawTableName).addSingleValueDimension("myMap", DataType.STRING)
+            .addSingleValueDimension("myMapStr", DataType.STRING)
+            .addSingleValueDimension("complexMapStr", DataType.STRING).build();
+    addSchema(schema);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).build();
+    addTableConfig(tableConfig);
+
+    // Create and upload segments
+    File avroFile = createAvroFile();
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(rawTableName, _tarDir);
 
     // Wait for all documents loaded
-    _currentTable = DEFAULT_TABLE_NAME;
     waitForAllDocsLoaded(60_000);
   }
 
-  @Override
-  protected long getCountStarResult() {
-    return TOTAL_DOCS;
-  }
-
-  protected void setUpSegmentsAndQueryGenerator()
+  private File createAvroFile()
       throws Exception {
-    List<Field> fields = new ArrayList<>();
-    fields.add(new Field("myMapStr", org.apache.avro.Schema.create(Type.STRING), "", null));
-    fields.add(new Field("complexMapStr", org.apache.avro.Schema.create(Type.STRING), "", null));
-    org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", "some desc", null, false);
+    org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    List<Field> fields = Arrays.asList(new Field("myMapStr", org.apache.avro.Schema.create(Type.STRING), null, null),
+        new Field("complexMapStr", org.apache.avro.Schema.create(Type.STRING), null, null));
     avroSchema.setFields(fields);
 
-    DataFileWriter<GenericData.Record> recordWriter =
-        new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(avroSchema));
-    String parent = "/tmp/mapTest";
-    File avroFile = new File(parent, "part-" + 0 + ".avro");
-    avroFile.getParentFile().mkdirs();
-    recordWriter.create(avroSchema, avroFile);
-
-    for (int i = 0; i < TOTAL_DOCS; i++) {
-      Map<String, String> map = new HashMap<>();
-      map.put("k1", "value-k1-" + i);
-      map.put("k2", "value-k2-" + i);
-      GenericData.Record record = new GenericData.Record(avroSchema);
-      record.put("myMapStr", JsonUtils.objectToString(map));
-
-      Map<String, Object> complexMap = new HashMap<>();
-      complexMap.put("k1", "value-k1-" + i);
-      complexMap.put("k2", "value-k2-" + i);
-      complexMap.put("k3", Arrays.asList("value-k3-0-" + i, "value-k3-1-" + i, "value-k3-2-" + i));
-      complexMap.put("k4", ImmutableMap
-          .of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i, "k4-k3", "value-k4-k3-" + i, "met", i));
-      record.put("complexMapStr", JsonUtils.objectToString(complexMap));
-      recordWriter.append(record);
-      sortedSequenceIds.add(String.valueOf(i));
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+        Map<String, String> map = new HashMap<>();
+        map.put("k1", "value-k1-" + i);
+        map.put("k2", "value-k2-" + i);
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put("myMapStr", JsonUtils.objectToString(map));
+
+        Map<String, Object> complexMap = new HashMap<>();
+        complexMap.put("k1", "value-k1-" + i);
+        complexMap.put("k2", "value-k2-" + i);
+        complexMap.put("k3", Arrays.asList("value-k3-0-" + i, "value-k3-1-" + i, "value-k3-2-" + i));
+        complexMap.put("k4", ImmutableMap
+            .of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i, "k4-k3", "value-k4-k3-" + i, "met", i));
+        record.put("complexMapStr", JsonUtils.objectToString(complexMap));
+        fileWriter.append(record);
+        sortedSequenceIds.add(String.valueOf(i));
+      }
     }
     Collections.sort(sortedSequenceIds);
-    recordWriter.close();
-
-    // Unpack the Avro files
-    List<File> avroFiles = Lists.newArrayList(avroFile);
-
-    // Create and upload segments without star tree indexes from Avro data
-    createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
-  }
-
-  private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, null, _schema, executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
 
-    uploadSegments(getTableName(), _tarDir);
+    return avroFile;
   }
 
   @Test
@@ -237,7 +177,6 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
     JsonNode pinotResponse = postQuery(pqlQuery);
     ArrayNode selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
 
-    LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
     Assert.assertNotNull(selectionResults);
     Assert.assertTrue(selectionResults.size() > 0);
     for (int i = 0; i < selectionResults.size(); i++) {
@@ -264,9 +203,8 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
         + "  where jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') = 'value-k4-k1-0'";
     pinotResponse = postQuery(pqlQuery);
     selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
-    LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
     Assert.assertNotNull(selectionResults);
-    Assert.assertTrue(selectionResults.size() == 1);
+    Assert.assertEquals(selectionResults.size(), 1);
     for (int i = 0; i < selectionResults.size(); i++) {
       String value = selectionResults.get(i).get(0).textValue();
       Assert.assertEquals(value,
@@ -275,17 +213,16 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
 
     //selection order by
     pqlQuery = "Select complexMapStr from " + DEFAULT_TABLE_NAME
-        + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') DESC LIMIT " + TOTAL_DOCS;
+        + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') DESC LIMIT " + NUM_TOTAL_DOCS;
     pinotResponse = postQuery(pqlQuery);
     selectionResults = (ArrayNode) pinotResponse.get("selectionResults").get("results");
-    LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, selectionResults);
     Assert.assertNotNull(selectionResults);
     Assert.assertTrue(selectionResults.size() > 0);
     for (int i = 0; i < selectionResults.size(); i++) {
       String value = selectionResults.get(i).get(0).textValue();
       Assert.assertTrue(value.indexOf("-k1-") > 0);
       Map results = JsonUtils.stringToObject(value, Map.class);
-      String seqId = sortedSequenceIds.get((int) (TOTAL_DOCS - 1 - i));
+      String seqId = sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
       Assert.assertEquals(results.get("k1"), "value-k1-" + seqId);
       Assert.assertEquals(results.get("k2"), "value-k2-" + seqId);
       final List k3 = (List) results.get("k3");
@@ -303,14 +240,13 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
     pqlQuery = "Select sum(jsonExtractScalar(complexMapStr,'$.k4.met','INT')) from " + DEFAULT_TABLE_NAME
         + " group by jsonExtractScalar(complexMapStr,'$.k1','STRING')";
     pinotResponse = postQuery(pqlQuery);
-    LOGGER.info("PQL Query: {}, Response: {}", pqlQuery, pinotResponse);
     Assert.assertNotNull(pinotResponse.get("aggregationResults"));
     JsonNode groupByResult = pinotResponse.get("aggregationResults").get(0).get("groupByResult");
     Assert.assertNotNull(groupByResult);
     Assert.assertTrue(groupByResult.isArray());
     Assert.assertTrue(groupByResult.size() > 0);
     for (int i = 0; i < groupByResult.size(); i++) {
-      String seqId = sortedSequenceIds.get((int) (TOTAL_DOCS - 1 - i));
+      String seqId = sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
       final JsonNode groupbyRes = groupByResult.get(i);
       Assert.assertEquals(groupbyRes.get("group").get(0).asText(), "value-k1-" + seqId);
       Assert.assertEquals(groupbyRes.get("value").asDouble(), Double.parseDouble(seqId));
@@ -320,7 +256,7 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest {
   @AfterClass
   public void tearDown()
       throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
+    dropOfflineTable(getTableName());
 
     stopServer();
     stopBroker();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 5ba2b5a..d77862f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -24,10 +24,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import org.apache.avro.reflect.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -56,58 +56,57 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
 
   private final boolean _isDirectAlloc = RANDOM.nextBoolean();
   private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+  private final boolean _enableSplitCommit = RANDOM.nextBoolean();
   private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
   private final long _startTime = System.currentTimeMillis();
 
-  @BeforeClass
   @Override
-  public void setUp()
-      throws Exception {
-    System.out.println(String.format(
-        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s",
-        RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource));
-
-    // Remove the consumer directory
-    File consumerDirectory = new File(CONSUMER_DIRECTORY);
-    if (consumerDirectory.exists()) {
-      FileUtils.deleteDirectory(consumerDirectory);
-    }
+  protected boolean useLlc() {
+    return true;
+  }
 
-    super.setUp();
+  @Override
+  protected String getLoadMode() {
+    return ReadMode.mmap.name();
   }
 
   @Override
   public void startController() {
     ControllerConf controllerConfig = getDefaultControllerConfiguration();
     controllerConfig.setHLCTablesAllowed(false);
-    controllerConfig.setSplitCommit(true);
+    controllerConfig.setSplitCommit(_enableSplitCommit);
     startController(controllerConfig);
     enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
   }
 
   @Override
-  protected boolean useLlc() {
-    return true;
-  }
-
-  @Nullable
-  @Override
-  protected String getLoadMode() {
-    return "MMAP";
-  }
-
-  @Override
   protected void overrideServerConf(Configuration configuration) {
     configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true);
     configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc);
     if (_isConsumerDirConfigured) {
       configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY);
     }
-    configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
-    configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+    if (_enableSplitCommit) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true);
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true);
+    }
     configuration.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT, true);
   }
 
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    System.out.println(String.format(
+        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s",
+        RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource));
+
+    // Remove the consumer directory
+    FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+    super.setUp();
+  }
+
   @Test
   public void testConsumerDirectoryExists() {
     File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME");
@@ -136,8 +135,10 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
     assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
     assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
 
-    updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
-    sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+    TableConfig tableConfig = getRealtimeTableConfig();
+    tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
+    updateTableConfig(tableConfig);
+    reloadRealtimeTable(getTableName());
 
     TestUtils.waitForCondition(aVoid -> {
       try {
@@ -165,6 +166,6 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
   @Test
   public void testReload()
       throws Exception {
-    super.testReload(false);
+    testReload(false);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
index 19bfcbb..6f7b408 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
@@ -18,61 +18,99 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.TextNode;
-import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Cluster integration test for near realtime text search
  */
-@Test(enabled=false)
-public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest {
+  private static final String TEXT_COLUMN_NAME = "skills";
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private static final int NUM_SKILLS = 24;
+  private static final int NUM_MATCHING_SKILLS = 4;
+  private static final int NUM_RECORDS = NUM_SKILLS * 1000;
+  private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000;
+
+  private static final String TEST_TEXT_COLUMN_QUERY =
+      "SELECT COUNT(*) FROM mytable WHERE TEXT_MATCH(skills, '\"machine learning\" AND spark')";
+
+  @Override
+  public String getTimeColumnName() {
+    return TIME_COLUMN_NAME;
+  }
 
-  private static final String TABLE_NAME = "mytable";
-  private static final String SKILLS_TEXT_COL_NAME = "SKILLS_TEXT_COL";
-  private static final String TIME_COL_NAME = "TIME_COL";
-  private static final String INT_COL_NAME = "INT_COL";
-  private static final int INT_BASE_VALUE = 10000;
-  Schema _schema;
+  // TODO: Support Lucene index on HLC consuming segments
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return Collections.singletonList(TEXT_COLUMN_NAME);
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @Override
+  protected List<FieldConfig> getFieldConfigs() {
+    return Collections.singletonList(
+        new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null));
+  }
 
   @BeforeClass
   public void setUp()
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
 
-    _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-        .addSingleValueDimension(SKILLS_TEXT_COL_NAME, FieldSpec.DataType.STRING)
-        .addMetric(INT_COL_NAME, FieldSpec.DataType.INT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COL_NAME), null)
-        .build();
-
     // Start the Pinot cluster
     startZk();
     startController();
@@ -82,37 +120,34 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration
     // Start Kafka
     startKafka();
 
-    // Unpack the Avro files
+    // Create the Avro file
     File avroFile = createAvroFile();
 
-    ExecutorService executor = Executors.newCachedThreadPool();
-
-    // Push data into the Kafka topic
-    pushAvroIntoKafka(Lists.newArrayList(avroFile), getKafkaTopic(), executor);
-
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-
-    // Create Pinot table
-    addSchema(_schema);
-    List<FieldConfig> textIndexColumns = new ArrayList<>();
-    FieldConfig fieldConfig = new FieldConfig(SKILLS_TEXT_COL_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null);
-    textIndexColumns.add(fieldConfig);
-
-    addRealtimeTable(TABLE_NAME, true, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
-        getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, null, null, TABLE_NAME,
-        getBrokerTenant(), getServerTenant(), getLoadMode(), null, null,
-        null, null, getTaskConfig(), getStreamConsumerFactoryClassName(),
-        1, textIndexColumns);
-
-    // just wait for 2sec for few docs to be loaded
-    waitForDocsLoaded(2000L, false );
+    // Create and upload the schema and table config
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+        .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN_NAME), null)
+        .build();
+    addSchema(schema);
+    addTableConfig(createRealtimeTableConfig(avroFile));
+
+    // Push data into Kafka
+    pushAvroIntoKafka(Collections.singletonList(avroFile));
+
+    // Wait until the table is queryable
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return getCurrentCountStarResult() >= 0;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 10_000L, "Failed to get COUNT(*) result");
   }
 
   @AfterClass
   public void tearDown()
       throws Exception {
-    dropRealtimeTable(TABLE_NAME);
+    dropRealtimeTable(getTableName());
     stopServer();
     stopBroker();
     stopController();
@@ -123,66 +158,61 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration
 
   private File createAvroFile()
       throws Exception {
-    // read the skills file
-    String[] skills = new String[100];
-    int skillCount = 0;
-    try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
-        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+    // Read all skills from the skill file
+    InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
+    assertNotNull(inputStream);
+    List<String> skills = new ArrayList<>(NUM_SKILLS);
+    try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
       String line;
       while ((line = reader.readLine()) != null) {
-        skills[skillCount++] = line;
+        skills.add(line);
       }
     }
-
-    org.apache.avro.Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(_schema);
-    DataFileWriter avroRecordWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
-    String pathToAvroFile = _tempDir.getAbsolutePath() + "/" + "skills.avro";
-    File outputAvroFile = new File(pathToAvroFile);
-    avroRecordWriter.create(avroSchema, outputAvroFile);
-
-    int counter = 0;
-    Random random = new Random();
-    // create Avro file with 100k documents
-    // it will be later pushed to Kafka
-    while (counter < 100000) {
-      GenericData.Record record = new GenericData.Record(avroSchema);
-      record.put(INT_COL_NAME, INT_BASE_VALUE + counter);
-      if (counter >= skillCount) {
-        int index = random.nextInt(skillCount);
-        record.put(SKILLS_TEXT_COL_NAME, skills[index]);
-      } else {
-        record.put(SKILLS_TEXT_COL_NAME, skills[counter]);
+    assertEquals(skills.size(), NUM_SKILLS);
+
+    File avroFile = new File(_tempDir, "data.avro");
+    org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(Arrays.asList(new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null),
+        new org.apache.avro.Schema.Field(TIME_COLUMN_NAME,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null)));
+    try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_RECORDS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(TEXT_COLUMN_NAME, skills.get(i % NUM_SKILLS));
+        record.put(TIME_COLUMN_NAME, System.currentTimeMillis());
+        fileWriter.append(record);
       }
-      record.put(TIME_COL_NAME, TimeUtils.getValidMinTimeMillis());
-      avroRecordWriter.append(record);
-      counter++;
     }
-
-    avroRecordWriter.close();
-    return outputAvroFile;
+    return avroFile;
   }
 
-
-  // we need to make this more deterministic. internal release builds
-  // are failing intermittently. disable until we make it reasonably deterministic
-  @Test(enabled=false)
-  public void testTextSearchCountQuery() throws Exception {
-    String pqlQuery =
-        "SELECT count(*) FROM " + TABLE_NAME + " WHERE text_match(SKILLS_TEXT_COL, '\"machine learning\" AND spark') LIMIT 1000000";
-    int prevResult = 0;
-    // run the same query 2000 times and see an increasing number of hits in the index and count(*) result
-    for (int i = 0; i < 2000; i++) {
-      JsonNode pinotResponse = postQuery(pqlQuery);
-      Assert.assertTrue(pinotResponse.has("aggregationResults"));
-      TextNode textNode = (TextNode)pinotResponse.get("aggregationResults").get(0).get("value");
-      int result = Integer.valueOf(textNode.textValue());
-      // TODO: see if this can be made more deterministic
-      if (i >= 300) {
-        Assert.assertTrue(result > 0);
-        Assert.assertTrue(result >= prevResult);
-      }
-      prevResult = result;
-      Thread.sleep(10);
+  @Test
+  public void testTextSearchCountQuery()
+      throws Exception {
+    // Keep posting queries until all records are consumed
+    long previousResult = 0;
+    while (getCurrentCountStarResult() < NUM_RECORDS) {
+      long result = getTextColumnQueryResult();
+      assertTrue(result >= previousResult);
+      previousResult = result;
+      Thread.sleep(100);
     }
+
+    // TODO: Fix Lucene index on consuming segments to update the latest records, then uncomment the following part
+//    TestUtils.waitForCondition(aVoid -> {
+//      try {
+//        return getTextColumnQueryResult() == NUM_MATCHING_RECORDS;
+//      } catch (Exception e) {
+//        fail("Caught exception while getting text column query result");
+//        return false;
+//      }
+//    }, 10_000L, "Failed to reach expected number of matching records");
+  }
+
+  private long getTextColumnQueryResult()
+      throws Exception {
+    return postQuery(TEST_TEXT_COLUMN_QUERY).get("aggregationResults").get(0).get("value").asLong();
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index d5fcb5a..8953ea1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -21,24 +21,23 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -71,37 +70,28 @@ public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest {
     startBroker();
     startServer();
 
-    // Create the tables
-    addOfflineTable(getTableName());
-
-    // Create and upload segments
-    File avroFile = createAvroFile();
-    Schema schema = new Schema.SchemaBuilder().setSchemaName(getTableName())
+    // Create and upload the schema and table config
+    String rawTableName = getTableName();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(rawTableName)
         .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
         .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
         .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
         .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
-    FieldSpec intKeyMapJsonStrFieldSpec = new DimensionFieldSpec();
-    intKeyMapJsonStrFieldSpec.setDataType(DataType.STRING);
-    intKeyMapJsonStrFieldSpec.setDefaultNullValue("");
-    intKeyMapJsonStrFieldSpec.setName(INT_KEY_MAP_STR_FIELD_NAME);
-    intKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")");
-    intKeyMapJsonStrFieldSpec.setSingleValueField(true);
-    schema.addField(intKeyMapJsonStrFieldSpec);
-    FieldSpec stringKeyMapJsonStrFieldSpec = new DimensionFieldSpec();
-    stringKeyMapJsonStrFieldSpec.setDataType(DataType.STRING);
-    stringKeyMapJsonStrFieldSpec.setDefaultNullValue("");
-    stringKeyMapJsonStrFieldSpec.setName(STRING_KEY_MAP_STR_FIELD_NAME );
+    FieldSpec stringKeyMapJsonStrFieldSpec =
+        new DimensionFieldSpec(STRING_KEY_MAP_STR_FIELD_NAME, DataType.STRING, true);
     stringKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")");
-    stringKeyMapJsonStrFieldSpec.setSingleValueField(true);
     schema.addField(stringKeyMapJsonStrFieldSpec);
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(Collections.singletonList(avroFile), 0, _segmentDir, _tarDir, getTableName(), null, null,
-            schema, executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-    uploadSegments(getTableName(), _tarDir);
+    FieldSpec intKeyMapJsonStrFieldSpec = new DimensionFieldSpec(INT_KEY_MAP_STR_FIELD_NAME, DataType.STRING, true);
+    intKeyMapJsonStrFieldSpec.setTransformFunction("toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")");
+    schema.addField(intKeyMapJsonStrFieldSpec);
+    addSchema(schema);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).build();
+    addTableConfig(tableConfig);
+
+    // Create and upload segments
+    File avroFile = createAvroFile();
+    ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(rawTableName, _tarDir);
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(60_000);
@@ -148,12 +138,12 @@ public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest {
     JsonNode selectionResults = pinotResponse.get("selectionResults").get("results");
     assertEquals(selectionResults.size(), 10);
     for (int i = 0; i < 10; i++) {
-      assertEquals(selectionResults.get(i).get(0).textValue(), String.format("{\"k1\":%d,\"k2\":100%d}",i,i));
+      assertEquals(selectionResults.get(i).get(0).textValue(), String.format("{\"k1\":%d,\"k2\":100%d}", i, i));
     }
     query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') FROM " + getTableName();
-     pinotResponse = postQuery(query);
+    pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
-     selectionResults = pinotResponse.get("selectionResults").get("results");
+    selectionResults = pinotResponse.get("selectionResults").get("results");
     assertEquals(selectionResults.size(), 10);
     for (int i = 0; i < 10; i++) {
       assertEquals(Integer.parseInt(selectionResults.get(i).get(0).textValue()), i);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
index a603c89..f3538d5 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
@@ -23,11 +23,8 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -39,112 +36,48 @@ import static org.testng.Assert.assertTrue;
 
 
 /**
- * /**
- * Integration test to check aggregation functions which use DictionaryBasedAggregationPlan and MetadataBasedAggregationPlan
- *
- * <ul>
- *   <li>
- *     Set up the Pinot cluster and create two tables, one with default indexes, one with star tree indexes
- *   </li>
- *   <li>
- *     Send queries to both the tables and check results
- *   </li>
- * </ul>
+ * Integration test to check aggregation functions which use {@code DictionaryBasedAggregationPlanNode} and
+ * {@code MetadataBasedAggregationPlanNode}.
  */
 // TODO: remove this integration test and add unit test for metadata and dictionary based aggregation operator
 public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends BaseClusterIntegrationTest {
-  private static final int NUM_BROKERS = 1;
-  private static final int NUM_SERVERS = 1;
-
-  protected int getNumBrokers() {
-    return NUM_BROKERS;
-  }
-
-  protected int getNumServers() {
-    return NUM_SERVERS;
-  }
-
-  private static final String SCHEMA_FILE_NAME =
-      "On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
-
-  @Override
-  protected String getSchemaFileName() {
-    return SCHEMA_FILE_NAME;
-  }
-
-  private static final String DEFAULT_TABLE_NAME = "myTable";
-  private static final String STAR_TREE_TABLE_NAME = "myStarTable";
-
-  private String _currentTable;
-
-  @Override
-  protected String getTableName() {
-    return _currentTable;
-  }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     // Start the Pinot cluster
     startZk();
     startController();
-    startBrokers(getNumBrokers());
-    startServers(getNumServers());
+    startBroker();
+    startServer();
 
-    // Create the tables
-    addOfflineTable(DEFAULT_TABLE_NAME);
-    addOfflineTable(STAR_TREE_TABLE_NAME);
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments without star tree indexes from Avro data
-    createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false, getRawIndexColumns(), null);
-
-    // Create and upload segments with star tree indexes from Avro data
-    createAndUploadSegments(avroFiles, STAR_TREE_TABLE_NAME, true, null, Schema.fromFile(getSchemaFile()));
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
 
-    // Load data into H2
-    _currentTable = DEFAULT_TABLE_NAME;
-    loadDataIntoH2(avroFiles);
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
-    _currentTable = STAR_TREE_TABLE_NAME;
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  private void loadDataIntoH2(List<File> avroFiles)
-      throws Exception {
-    ExecutorService executor = Executors.newCachedThreadPool();
-    setUpH2Connection(avroFiles, executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-  }
-
-  private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex,
-      List<String> rawIndexColumns, Schema pinotSchema)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, null, rawIndexColumns, pinotSchema,
-            executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-
-    uploadSegments(getTableName(), _tarDir);
   }
 
   @Test
   public void testDictionaryBasedQueries()
       throws Exception {
-
+    String tableName = getTableName();
     String pqlQuery;
-    String pqlStarTreeQuery;
     String sqlQuery;
     String sqlQuery1;
     String sqlQuery2;
@@ -153,223 +86,148 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
     // Test queries with min, max, minmaxrange
     // Dictionary columns
     // int
-    pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrTime) FROM " + tableName;
+    sqlQuery = "SELECT MIN(ArrTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(ArrTime) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrTime), MAX(ArrTime), MINMAXRANGE(ArrTime) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrTime) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(ArrTime) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(ArrTime)-MIN(ArrTime) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrTime), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrTime) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
     // float
-    pqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(DepDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery =
-        "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery =
-        "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelayMinutes), MAX(DepDelayMinutes), MINMAXRANGE(DepDelayMinutes) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(DepDelayMinutes) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(DepDelayMinutes)-MIN(DepDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelayMinutes), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(DepDelayMinutes) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // double
-    pqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery =
-        "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery =
-        "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelayMinutes), MAX(ArrDelayMinutes), MINMAXRANGE(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(ArrDelayMinutes)-MIN(ArrDelayMinutes) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelayMinutes), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrDelayMinutes) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // long
-    pqlQuery = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(AirlineID) FROM " + tableName;
+    sqlQuery = "SELECT MAX(AirlineID) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(AirlineID) FROM " + tableName;
+    sqlQuery = "SELECT MIN(AirlineID) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(AirlineID) FROM " + tableName;
+    sqlQuery = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(AirlineID), MAX(AirlineID), MINMAXRANGE(AirlineID) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(AirlineID) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(AirlineID) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(AirlineID)-MIN(AirlineID) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(AirlineID) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(AirlineID), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(AirlineID) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // string
     // TODO: add test cases for string column when we add support for min and max on string datatype columns
 
     // Non dictionary columns
     // int
-    pqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+    sqlQuery = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(ActualElapsedTime) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM "
-        + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM "
-        + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery =
+        "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), MINMAXRANGE(ActualElapsedTime) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(ActualElapsedTime) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(ActualElapsedTime)-MIN(ActualElapsedTime) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ActualElapsedTime), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ActualElapsedTime) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // float
-    pqlQuery = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrDelay) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelay) FROM " + tableName;
+    sqlQuery = "SELECT MIN(ArrDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(ArrDelay) FROM " + tableName;
+    sqlQuery = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelay), MAX(ArrDelay), MINMAXRANGE(ArrDelay) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(ArrDelay) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(ArrDelay)-MIN(ArrDelay) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(ArrDelay), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(ArrDelay) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // double
-    pqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
+    sqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelay) FROM " + tableName;
+    sqlQuery = "SELECT MIN(DepDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MINMAXRANGE(DepDelay) FROM " + tableName;
+    sqlQuery = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
-    pqlQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery3 = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelay), MAX(DepDelay), MINMAXRANGE(DepDelay) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(DepDelay) FROM " + tableName;
+    sqlQuery2 = "SELECT MAX(DepDelay) FROM " + tableName;
+    sqlQuery3 = "SELECT MAX(DepDelay)-MIN(DepDelay) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2, sqlQuery3));
-    pqlQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery1 = "SELECT MIN(DepDelay) FROM " + DEFAULT_TABLE_NAME;
-    sqlQuery2 = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MIN(DepDelay), COUNT(*) FROM " + tableName;
+    sqlQuery1 = "SELECT MIN(DepDelay) FROM " + tableName;
+    sqlQuery2 = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
-    testQuery(pqlStarTreeQuery, Lists.newArrayList(sqlQuery1, sqlQuery2));
 
     // string
     // TODO: add test cases for string column when we add support for min and max on string datatype columns
@@ -378,42 +236,42 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
     JsonNode response;
 
     // Dictionary column: answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // Non dictionary column: not answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(DepDelay) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong());
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // multiple dictionary based aggregation functions, dictionary columns: answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // multiple aggregation functions, mix of dictionary based and non dictionary based: not answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), response.get("numDocsScanned").asLong());
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // group by in query : not answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME + "  group by DaysSinceEpoch";
+    pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + "  group by DaysSinceEpoch";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // filter in query: not answered by DictionaryBasedAggregationOperator
-    pqlQuery = "SELECT MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME + " where DaysSinceEpoch > 16100";
+    pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " where DaysSinceEpoch > 16100";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
@@ -422,73 +280,54 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
   @Test
   public void testMetadataBasedQueries()
       throws Exception {
-
+    String tableName = getTableName();
     String pqlQuery;
-    String pqlStarTreeQuery;
     String sqlQuery;
 
     // Test queries with count *
-    pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT COUNT(*) FROM " + tableName;
+    sqlQuery = "SELECT COUNT(*) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
 
     // Test queries with max on time column
-    pqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + DEFAULT_TABLE_NAME;
-    pqlStarTreeQuery = "SELECT MAX(DaysSinceEpoch) FROM " + STAR_TREE_TABLE_NAME;
-    sqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + tableName;
+    sqlQuery = "SELECT MAX(DaysSinceEpoch) FROM " + tableName;
     testQuery(pqlQuery, Collections.singletonList(sqlQuery));
-    testQuery(pqlStarTreeQuery, Collections.singletonList(sqlQuery));
 
     // Check execution stats
     JsonNode response;
 
-    pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT COUNT(*) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
-    pqlStarTreeQuery = "SELECT COUNT(*) FROM " + STAR_TREE_TABLE_NAME;
-    response = postQuery(pqlStarTreeQuery);
-    assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
-    assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
-    assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
-
     // group by present in query: not answered by MetadataBasedAggregationOperator
-    pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME + " GROUP BY DaysSinceEpoch";
+    pqlQuery = "SELECT COUNT(*) FROM " + tableName + " GROUP BY DaysSinceEpoch";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
 
     // filter present in query: not answered by MetadataBasedAggregationOperator
-    pqlQuery = "SELECT COUNT(*) FROM " + DEFAULT_TABLE_NAME + " WHERE DaysSinceEpoch > 16100";
+    pqlQuery = "SELECT COUNT(*) FROM " + tableName + " WHERE DaysSinceEpoch > 16100";
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
 
     // mixed aggregation functions in query: not answered by MetadataBasedAggregationOperator
-    pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + DEFAULT_TABLE_NAME;
+    pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
-
-    // mixed aggregation functions in star tree query: not answered by MetadataBasedAggregationOperator
-    pqlStarTreeQuery = "SELECT COUNT(*),MAX(DaysSinceEpoch) FROM " + STAR_TREE_TABLE_NAME;
-    response = postQuery(pqlStarTreeQuery);
-    assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
-    assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
-    assertEquals(response.get("totalDocs").asLong(), response.get("numDocsScanned").asLong());
   }
 
   @AfterClass
   public void tearDown()
       throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-    dropOfflineTable(STAR_TREE_TABLE_NAME);
+    dropOfflineTable(getTableName());
 
     stopServer();
     stopBroker();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 14b5bc4..f433a3e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,14 +24,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.exception.QueryException;
@@ -42,6 +38,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -51,7 +48,10 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 /**
@@ -72,18 +72,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
 
-  // For inverted index triggering test
+  // For range index triggering test
   private static final List<String> UPDATED_RANGE_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
   private static final String TEST_UPDATED_RANGE_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime > 305";
 
+  // For bloom filter triggering test
   private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
   private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
 
   // For default columns test
-  private static final String SCHEMA_WITH_EXTRA_COLUMNS =
+  private static final String SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS =
       "On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema";
-  private static final String SCHEMA_WITH_MISSING_COLUMNS =
+  private static final String SCHEMA_FILE_NAME_WITH_MISSING_COLUMNS =
       "On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_missing_columns.schema";
   private static final String TEST_DEFAULT_COLUMNS_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE NewAddedIntDimension < 0";
@@ -100,6 +101,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return NUM_SERVERS;
   }
 
+  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
+
+  @Override
+  protected String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  // NOTE: Only allow removing default columns for v1 segment
+  @Override
+  protected String getSegmentVersion() {
+    return SegmentVersion.v1.name();
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -111,31 +125,24 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     startBrokers(getNumBrokers());
     startServers(getNumServers());
 
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    ExecutorService executor = Executors.newCachedThreadPool();
-
-    // Create segments from Avro data
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, getTableName(), null, getRawIndexColumns(), null,
-            executor);
-
-    // Load data into H2
-    setUpH2Connection(avroFiles, executor);
-
-    // Initialize query generator
-    setUpQueryGenerator(avroFiles, executor);
-
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
 
-    // Create the table
-    addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
-        getBloomFilterIndexColumns(), getRangeIndexColumns(), getTaskConfig(), null, null);
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
 
-    // Upload all segments
-    uploadSegments(getTableName(), _tarDir);
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
 
     // Set up service status callbacks
     // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the
@@ -199,12 +206,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Test
   public void testRefreshTableConfigAndQueryTimeout()
       throws Exception {
-    TableConfig tableConfig = _helixResourceManager.getOfflineTableConfig(getTableName());
-    assertNotNull(tableConfig);
-
     // Set timeout as 5ms so that query will timeout
+    TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(5L));
-    _helixResourceManager.updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig);
 
     // Wait for at most 1 minute for broker to receive and process the table config refresh message
     TestUtils.waitForCondition(aVoid -> {
@@ -228,7 +233,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Remove timeout so that query will finish
     tableConfig.setQueryConfig(null);
-    _helixResourceManager.updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig);
 
     // Wait for at most 1 minute for broker to receive and process the table config refresh message
     TestUtils.waitForCondition(aVoid -> {
@@ -280,16 +285,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Test
   public void testInvertedIndexTriggering()
       throws Exception {
-    final long numTotalDocs = getCountStarResult();
+    long numTotalDocs = getCountStarResult();
 
     JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
     assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
 
     // Update table config and trigger reload
-    updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
-        UPDATED_INVERTED_INDEX_COLUMNS, null, null, getTaskConfig(), null, null);
-
-    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
+    updateTableConfig(tableConfig);
+    reloadOfflineTable(getTableName());
 
     TestUtils.waitForCondition(aVoid -> {
       try {
@@ -304,58 +309,55 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testBloomFilterTriggering()
+  public void testRangeIndexTriggering()
       throws Exception {
-    final long numTotalDocs = getCountStarResult();
-    JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
-    assertEquals(queryResponse.get("numSegmentsProcessed").asLong(), NUM_SEGMENTS);
+    long numTotalDocs = getCountStarResult();
 
-    // Update table config and trigger reload
-    updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
-        UPDATED_BLOOM_FILTER_COLUMNS, null, getTaskConfig(), null, null);
+    JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+    assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
 
-    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+    // Update table config and trigger reload
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
+    updateTableConfig(tableConfig);
+    reloadOfflineTable(getTableName());
 
     TestUtils.waitForCondition(aVoid -> {
       try {
-        JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
         // Total docs should not change during reload
         assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
-        return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
+        return queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 600_000L, "Failed to generate bloom index");
+    }, 600_000L, "Failed to generate range index");
   }
 
   @Test
-  public void testRangeIndexTriggering()
+  public void testBloomFilterTriggering()
       throws Exception {
-    final long numTotalDocs = getCountStarResult();
-    JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
-    System.out.println("Before queryResponse = " + queryResponse);
-    assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
-    long beforeCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+    long numTotalDocs = getCountStarResult();
 
-    // Update table config and trigger reload
-    updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null, null,
-        UPDATED_RANGE_INDEX_COLUMNS, getTaskConfig(), null, null);
+    JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+    assertEquals(queryResponse.get("numSegmentsProcessed").asLong(), NUM_SEGMENTS);
 
-    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+    // Update table config and trigger reload
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS);
+    updateTableConfig(tableConfig);
+    reloadOfflineTable(getTableName());
 
     TestUtils.waitForCondition(aVoid -> {
       try {
-        JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
-        System.out.println("After queryResponse = " + queryResponse);
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY);
+        // Total docs should not change during reload
         assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
-        long afterCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
-        //we should be scanning less than numTotalDocs with index enabled.
-        //In the current implementation its 8785, but it
-        return beforeCount == afterCount && queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
+        return queryResponse1.get("numSegmentsProcessed").asLong() == 0L;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 600_000L, "Failed to generate inverted index");
+    }, 600_000L, "Failed to generate bloom filter");
   }
 
   /**
@@ -392,18 +394,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(queryResponse.get("selectionResults").get("columns").size(), 79);
   }
 
-  private void reloadDefaultColumns(final boolean withExtraColumns)
+  private void reloadDefaultColumns(boolean withExtraColumns)
       throws Exception {
-    final long numTotalDocs = getCountStarResult();
+    long numTotalDocs = getCountStarResult();
 
     if (withExtraColumns) {
-      sendSchema(SCHEMA_WITH_EXTRA_COLUMNS);
+      _schemaFileName = SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS;
+      addSchema(createSchema());
     } else {
-      sendSchema(SCHEMA_WITH_MISSING_COLUMNS);
+      _schemaFileName = SCHEMA_FILE_NAME_WITH_MISSING_COLUMNS;
+      addSchema(createSchema());
     }
 
     // Trigger reload
-    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+    reloadOfflineTable(getTableName());
 
     String errorMessage;
     if (withExtraColumns) {
@@ -429,14 +433,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }, 600_000L, errorMessage);
   }
 
-  private void sendSchema(String resourceName)
-      throws Exception {
-    URL resource = OfflineClusterIntegrationTest.class.getClassLoader().getResource(resourceName);
-    assertNotNull(resource);
-    File schemaFile = new File(resource.getFile());
-    addSchema(schemaFile, getTableName());
-  }
-
   private void testNewAddedColumns()
       throws Exception {
     long numTotalDocs = getCountStarResult();
@@ -926,24 +922,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testCaseInsensitivity()
-      throws Exception {
-    addSchema(getSchemaFile(), getTableName());
-    List<String> queries = new ArrayList<>();
+  public void testCaseInsensitivity() {
     int daysSinceEpoch = 16138;
     long secondsSinceEpoch = 16138 * 24 * 60 * 60;
-    queries.add("SELECT * FROM mytable");
-    queries.add("SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable");
-    queries.add(
-        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000");
-    queries.add(
-        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000");
-    queries.add("SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch);
-    queries
-        .add("SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch);
-    queries.add("SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch);
-    queries.add("SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable");
-    queries.add(
+    List<String> queries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     queries.replaceAll(query -> query.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"));
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
deleted file mode 100644
index 6e99f30..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.io.FileUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests the URI upload path through a local file uri.
- */
-public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PinotURIUploadIntegrationTest.class);
-  private String _tableName;
-  private File _metadataDir = new File(_segmentDir, "tmpMeta");
-
-  @Nonnull
-  @Override
-  protected String getTableName() {
-    return _tableName;
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteQuietly(_metadataDir);
-    FileUtils.deleteQuietly(new File(_metadataDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
-    // Start an empty Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-  }
-
-  @BeforeMethod
-  public void setupMethod(Object[] args)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-    if (args == null || args.length == 0) {
-      return;
-    }
-    _tableName = (String) args[0];
-    SegmentVersion version = (SegmentVersion) args[1];
-    addOfflineTable(_tableName, version);
-  }
-
-  @AfterMethod
-  public void teardownMethod()
-      throws Exception {
-    if (_tableName != null) {
-      dropOfflineTable(_tableName);
-    }
-  }
-
-  private File generateRandomSegment(String segmentName, int rowCount)
-      throws Exception {
-    ThreadLocalRandom random = ThreadLocalRandom.current();
-    Schema schema = new Schema.Parser()
-        .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
-    GenericRecord record = new GenericData.Record(schema);
-    GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
-    DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
-    File avroFile = new File(_tempDir, segmentName + ".avro");
-    fileWriter.create(schema, avroFile);
-
-    for (int i = 0; i < rowCount; i++) {
-      record.put(0, random.nextInt());
-      fileWriter.append(record);
-    }
-
-    fileWriter.close();
-
-    int segmentIndex = Integer.parseInt(segmentName.split("_")[1]);
-
-    File segmentTarDir = new File(_tarDir, segmentName);
-    TestUtils.ensureDirectoriesExistAndEmpty(segmentTarDir);
-    ExecutorService executor = MoreExecutors.newDirectExecutorService();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(Collections.singletonList(avroFile), segmentIndex, new File(_segmentDir, segmentName),
-            segmentTarDir, this._tableName, executor);
-    executor.shutdown();
-    executor.awaitTermination(1L, TimeUnit.MINUTES);
-
-    FileUtils.forceDelete(avroFile);
-    return new File(_tarDir, segmentName);
-  }
-
-  @DataProvider(name = "configProvider")
-  public Object[][] configProvider() {
-    Object[][] configs = {{"mytable", SegmentVersion.v1}, {"yourtable", SegmentVersion.v3}};
-    return configs;
-  }
-
-  @Test(dataProvider = "configProvider")
-  public void testRefresh(String tableName, SegmentVersion version)
-      throws Exception {
-    final String segment6 = "segmentToBeRefreshed_6";
-    final int nRows1 = 69;
-    File segmentTarDir = generateRandomSegment(segment6, nRows1);
-    uploadSegmentsDirectly(segmentTarDir);
-    verifyNRows(0, nRows1);
-    FileUtils.forceDelete(segmentTarDir);
-  }
-
-  // Verify that the number of rows is either the initial value or the final value but not something else.
-  private void verifyNRows(int currentNrows, int finalNrows)
-      throws Exception {
-    int attempt = 0;
-    long sleepTime = 100;
-    long nRows;
-    while (attempt < 10) {
-      Thread.sleep(sleepTime);
-      try {
-        nRows = getCurrentCountStarResult();
-      } catch (Exception e) {
-        nRows = -1;
-      }
-      //nRows can either be the current value or the final value, not any other.
-      if (nRows == currentNrows || nRows == -1) {
-        sleepTime *= 2;
-        attempt++;
-      } else if (nRows == finalNrows) {
-        return;
-      } else {
-        Assert.fail("Found unexpected number of rows " + nRows);
-      }
-    }
-    Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
-  }
-
-  @AfterClass
-  public void tearDown() {
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    FileUtils.deleteQuietly(_tempDir);
-    FileUtils.deleteQuietly(_metadataDir);
-    FileUtils.deleteQuietly(new File(_metadataDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
-  }
-
-  /**
-   * Upload all segments inside the given directory to the cluster.
-   *
-   * @param segmentDir Segment directory
-   */
-  private void uploadSegmentsDirectly(@Nonnull File segmentDir)
-      throws Exception {
-    String[] segmentNames = segmentDir.list();
-    Assert.assertNotNull(segmentNames);
-
-    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
-      // Upload all segments in parallel
-      int numSegments = segmentNames.length;
-      ExecutorService executor = Executors.newFixedThreadPool(numSegments);
-      List<Future<Integer>> tasks = new ArrayList<>(numSegments);
-      for (final String segmentName : segmentNames) {
-        File segmentFile = new File(segmentDir, segmentName);
-        String downloadUri = segmentFile.toURI().toString();
-        final List<Header> httpHeaders = null;
-
-        tasks.add(executor.submit(new Callable<Integer>() {
-          @Override
-          public Integer call()
-              throws Exception {
-            List<NameValuePair> parameters = Collections.singletonList(
-                new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, getTableName()));
-
-            return fileUploadDownloadClient
-                .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort),
-                    downloadUri, httpHeaders, parameters, 60 * 1000).getStatusCode();
-          }
-        }));
-      }
-      for (Future<Integer> task : tasks) {
-        Assert.assertEquals((int) task.get(), HttpStatus.SC_OK);
-      }
-      Assert.assertTrue(getAllSegments(getTableName()).size() == 1);
-
-      executor.shutdown();
-    }
-  }
-
-  private List<String> getAllSegments(String tableName)
-      throws IOException {
-    List<String> allSegments = new ArrayList<>();
-    HttpHost controllerHttpHost = new HttpHost("localhost", _controllerPort);
-    HttpClient controllerClient = new DefaultHttpClient();
-    HttpGet req = new HttpGet("/segments/" + tableName);
-    HttpResponse res = controllerClient.execute(controllerHttpHost, req);
-    try {
-      if (res.getStatusLine().getStatusCode() != 200) {
-        throw new IllegalStateException(res.getStatusLine().toString());
-      }
-      InputStream content = res.getEntity().getContent();
-      JsonNode segmentsData = JsonUtils.inputStreamToJsonNode(content);
-
-      if (segmentsData != null) {
-        JsonNode offlineSegments = segmentsData.get(0).get("OFFLINE");
-        if (offlineSegments != null) {
-          for (JsonNode segment : offlineSegments) {
-            allSegments.add(segment.asText());
-          }
-        }
-      }
-      LOGGER.info("All segments : {}", allSegments);
-    } finally {
-      if (res.getEntity() != null) {
-        EntityUtils.consume(res.getEntity());
-      }
-    }
-    return allSegments;
-  }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6db9137..6e7636a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -21,9 +21,6 @@ package org.apache.pinot.integration.tests;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -53,22 +50,18 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    ExecutorService executor = Executors.newCachedThreadPool();
+    // Create and upload the schema and table config
+    addSchema(createSchema());
+    addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
 
-    // Push data into the Kafka topic
-    pushAvroIntoKafka(avroFiles, getKafkaTopic(), executor);
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
 
-    // Load data into H2
-    setUpH2Connection(avroFiles, executor);
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
 
-    // Initialize query generator
-    setUpQueryGenerator(avroFiles, executor);
-
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-
-    // Create Pinot table
-    setUpRealtimeTable(avroFiles.get(0));
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index 0dc4560..a03ca98 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -60,6 +60,11 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest
   private String _currentSegment;
 
   @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
   protected int getNumKafkaPartitions() {
     return NUM_KAFKA_PARTITIONS;
   }
@@ -76,13 +81,9 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest
     // Start Kafka
     startKafka();
 
-    // Create Pinot table
-    setUpRealtimeTable(null);
-  }
-
-  @Override
-  protected boolean useLlc() {
-    return true;
+    // Create and upload the schema and table config
+    addSchema(createSchema());
+    addTableConfig(createRealtimeTableConfig(null));
   }
 
   /**
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 3416164..a0be8ed 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -31,7 +31,6 @@ import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.events.MinionEventObserver;
 import org.apache.pinot.minion.events.MinionEventObserverFactory;
@@ -42,6 +41,7 @@ import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -84,11 +84,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     // Add 3 offline tables, where 2 of them have TestTask enabled
     TableTaskConfig taskConfig =
         new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
-    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
-        null);
-    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
-        null);
-    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null, null);
+    addTableConfig(
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build());
+    addTableConfig(
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setTaskConfig(taskConfig).build());
+    addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_3).build());
 
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index ccb2e17..7b80f71 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -23,17 +23,12 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
-import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
 import org.apache.pinot.tools.query.comparison.SegmentInfoProvider;
@@ -73,32 +68,30 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
           AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE);
   private static final int NUM_QUERIES_TO_GENERATE = 100;
 
-  private List<String> _starTree1Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
-  private List<String> _starTree2Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
-  private List<String> _starTree1Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
-  private List<String> _starTree2Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+  private String _currentTable;
   private StarTreeQueryGenerator _starTree1QueryGenerator;
   private StarTreeQueryGenerator _starTree2QueryGenerator;
 
-  private Schema _schema;
-  private String _currentTable;
-
-  @Nonnull
   @Override
   protected String getTableName() {
     return _currentTable;
   }
 
-  @Nonnull
   @Override
   protected String getSchemaFileName() {
     return SCHEMA_FILE_NAME;
   }
 
+  // NOTE: Star-Tree and SegmentInfoProvider does not work on no-dictionary dimensions
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay");
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     // Start the Pinot cluster
     startZk();
@@ -106,87 +99,73 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
     startBroker();
     startServers(2);
 
-    // Create the tables
-    addOfflineTable(DEFAULT_TABLE_NAME);
-    addOfflineTable(STAR_TREE_TABLE_NAME);
-
-    // Set up segments and query generator
-    _schema = Schema.fromFile(getSchemaFile());
-    setUpSegmentsAndQueryGenerator();
-
-    // Wait for all documents loaded
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
     _currentTable = DEFAULT_TABLE_NAME;
-    waitForAllDocsLoaded(600_000L);
-    _currentTable = STAR_TREE_TABLE_NAME;
-    waitForAllDocsLoaded(600_000L);
-  }
+    TableConfig defaultTableConfig = createOfflineTableConfig();
+    addTableConfig(defaultTableConfig);
 
-  private void setUpSegmentsAndQueryGenerator()
-      throws Exception {
-    // Randomly pick some dimensions and metrics for star-tree V2
-    List<String> allDimensions = new ArrayList<>(_schema.getDimensionNames());
+    // Randomly pick some dimensions and metrics for star-trees
+    List<String> starTree1Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
+    List<String> starTree2Dimensions = new ArrayList<>(NUM_STAR_TREE_DIMENSIONS);
+    List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
     Collections.shuffle(allDimensions);
     for (int i = 0; i < NUM_STAR_TREE_DIMENSIONS; i++) {
-      _starTree1Dimensions.add(allDimensions.get(2 * i));
-      _starTree2Dimensions.add(allDimensions.get(2 * i + 1));
+      starTree1Dimensions.add(allDimensions.get(2 * i));
+      starTree2Dimensions.add(allDimensions.get(2 * i + 1));
     }
-    List<String> allMetrics = new ArrayList<>(_schema.getMetricNames());
+    List<String> starTree1Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+    List<String> starTree2Metrics = new ArrayList<>(NUM_STAR_TREE_METRICS);
+    List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
     Collections.shuffle(allMetrics);
     for (int i = 0; i < NUM_STAR_TREE_METRICS; i++) {
-      _starTree1Metrics.add(allMetrics.get(2 * i));
-      _starTree2Metrics.add(allMetrics.get(2 * i + 1));
+      starTree1Metrics.add(allMetrics.get(2 * i));
+      starTree2Metrics.add(allMetrics.get(2 * i + 1));
     }
+    _currentTable = STAR_TREE_TABLE_NAME;
+    TableConfig starTreeTableConfig = createOfflineTableConfig();
+    starTreeTableConfig.getIndexingConfig().setStarTreeIndexConfigs(Arrays
+        .asList(getStarTreeIndexConfig(starTree1Dimensions, starTree1Metrics),
+            getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics)));
+    addTableConfig(starTreeTableConfig);
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments without star tree indexes from Avro data
-    createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, defaultTableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, starTreeTableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(STAR_TREE_TABLE_NAME, _tarDir);
 
-    // Initialize the query generator using segments without star tree indexes
-    SegmentInfoProvider segmentInfoProvider = new SegmentInfoProvider(_tarDir.getAbsolutePath());
+    // Set up the query generators
+    SegmentInfoProvider segmentInfoProvider = new SegmentInfoProvider(_tarDir.getPath());
     List<String> aggregationFunctions = new ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
     for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
       aggregationFunctions.add(functionType.getName());
     }
-    _starTree1QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, _starTree1Dimensions, _starTree1Metrics,
+    _starTree1QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree1Dimensions, starTree1Metrics,
         segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
-    _starTree2QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, _starTree2Dimensions, _starTree2Metrics,
+    _starTree2QueryGenerator = new StarTreeQueryGenerator(STAR_TREE_TABLE_NAME, starTree2Dimensions, starTree2Metrics,
         segmentInfoProvider.getSingleValueDimensionValuesMap(), aggregationFunctions);
 
-    // Create and upload segments with star tree indexes from Avro data
-    createAndUploadSegments(avroFiles, STAR_TREE_TABLE_NAME, true);
-  }
-
-  private void createAndUploadSegments(List<File> avroFiles, String tableName, boolean createStarTreeIndex)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
-
-    List<StarTreeV2BuilderConfig> starTreeV2BuilderConfigs = null;
-    if (createStarTreeIndex) {
-      starTreeV2BuilderConfigs = Arrays.asList(getBuilderConfig(_starTree1Dimensions, _starTree1Metrics),
-          getBuilderConfig(_starTree2Dimensions, _starTree2Metrics));
-    }
-
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, starTreeV2BuilderConfigs, null, _schema,
-            executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-
-    uploadSegments(getTableName(), _tarDir);
+    // Wait for all documents loaded
+    _currentTable = DEFAULT_TABLE_NAME;
+    waitForAllDocsLoaded(600_000L);
+    _currentTable = STAR_TREE_TABLE_NAME;
+    waitForAllDocsLoaded(600_000L);
   }
 
-  private static StarTreeV2BuilderConfig getBuilderConfig(List<String> dimensions, List<String> metrics) {
-    Set<AggregationFunctionColumnPair> functionColumnPairs = new HashSet<>();
+  private static StarTreeIndexConfig getStarTreeIndexConfig(List<String> dimensions, List<String> metrics) {
+    List<String> functionColumnPairs = new ArrayList<>();
     for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
       for (String metric : metrics) {
-        functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric));
+        functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric).toColumnName());
       }
     }
-    return new StarTreeV2BuilderConfig.Builder().setDimensionsSplitOrder(dimensions)
-        .setFunctionColumnPairs(functionColumnPairs).setMaxLeafRecords(10).build();
+    return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, 0);
   }
 
   @Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java
deleted file mode 100644
index fabceae..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.File;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Test that uploads, refreshes and deletes segments from multiple threads and checks that the row count in Pinot
- * matches with the expected count.
- *
- * @author jfim
- */
-// TODO: clean up this test
-public class UploadRefreshDeleteIntegrationTest extends BaseClusterIntegrationTest {
-  private static final Logger LOGGER = LoggerFactory.getLogger(UploadRefreshDeleteIntegrationTest.class);
-  private String _tableName;
-
-  @Nonnull
-  @Override
-  protected String getTableName() {
-    return _tableName;
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    // Start an empty Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-  }
-
-  @BeforeMethod
-  public void setupMethod(Object[] args)
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-    if (args == null || args.length == 0) {
-      return;
-    }
-    _tableName = (String) args[0];
-    SegmentVersion version = (SegmentVersion) args[1];
-    addOfflineTable(_tableName, version);
-  }
-
-  @AfterMethod
-  public void teardownMethod()
-      throws Exception {
-    if (_tableName != null) {
-      dropOfflineTable(_tableName);
-    }
-  }
-
-  protected void generateAndUploadRandomSegment(String segmentName, int rowCount)
-      throws Exception {
-    ThreadLocalRandom random = ThreadLocalRandom.current();
-    Schema schema = new Schema.Parser()
-        .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
-    GenericRecord record = new GenericData.Record(schema);
-    GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
-    DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
-    File avroFile = new File(_tempDir, segmentName + ".avro");
-    fileWriter.create(schema, avroFile);
-
-    for (int i = 0; i < rowCount; i++) {
-      record.put(0, random.nextInt());
-      fileWriter.append(record);
-    }
-
-    fileWriter.close();
-
-    int segmentIndex = Integer.parseInt(segmentName.split("_")[1]);
-
-    File segmentTarDir = new File(_tarDir, segmentName);
-    TestUtils.ensureDirectoriesExistAndEmpty(segmentTarDir);
-    ExecutorService executor = MoreExecutors.newDirectExecutorService();
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(Collections.singletonList(avroFile), segmentIndex, new File(_segmentDir, segmentName),
-            segmentTarDir, this._tableName, executor);
-    executor.shutdown();
-    executor.awaitTermination(1L, TimeUnit.MINUTES);
-
-    uploadSegments(getTableName(), segmentTarDir);
-
-    FileUtils.forceDelete(avroFile);
-    FileUtils.forceDelete(segmentTarDir);
-  }
-
-  @DataProvider(name = "configProvider")
-  public Object[][] configProvider() {
-    Object[][] configs = {{"mytable", SegmentVersion.v1}, {"yourtable", SegmentVersion.v3}};
-    return configs;
-  }
-
-  @Test(dataProvider = "configProvider")
-  public void testRefresh(String tableName, SegmentVersion version)
-      throws Exception {
-    final int nAtttempts = 5;
-    final String segment6 = "segmentToBeRefreshed_6";
-    final int nRows1 = 69;
-    generateAndUploadRandomSegment(segment6, nRows1);
-    verifyNRows(0, nRows1);
-    final int nRows2 = 198;
-    LOGGER.info("Segment {} loaded with {} rows, refreshing with {}", segment6, nRows1, nRows2);
-    generateAndUploadRandomSegment(segment6, nRows2);
-    verifyNRows(nRows1, nRows2);
-    // Load another segment while keeping this one in place.
-    final String segment9 = "newSegment_9";
-    final int nRows3 = 102;
-    generateAndUploadRandomSegment(segment9, nRows3);
-    verifyNRows(nRows2, nRows2 + nRows3);
-  }
-
-  // Verify that the number of rows is either the initial value or the final value but not something else.
-  private void verifyNRows(int currentNrows, int finalNrows)
-      throws Exception {
-    int attempt = 0;
-    long sleepTime = 100;
-    long nRows;
-    while (attempt < 10) {
-      Thread.sleep(sleepTime);
-      try {
-        nRows = getCurrentCountStarResult();
-      } catch (Exception e) {
-        nRows = -1;
-      }
-      //nRows can either be the current value or the final value, not any other.
-      if (nRows == currentNrows || nRows == -1) {
-        sleepTime *= 2;
-        attempt++;
-      } else if (nRows == finalNrows) {
-        return;
-      } else {
-        Assert.fail("Found unexpected number of rows " + nRows);
-      }
-    }
-    Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
-  }
-
-  @Test(enabled = false, dataProvider = "configProvider")
-  public void testUploadRefreshDelete(String tableName, SegmentVersion version)
-      throws Exception {
-    final int THREAD_COUNT = 1;
-    final int SEGMENT_COUNT = 5;
-
-    final int MIN_ROWS_PER_SEGMENT = 500;
-    final int MAX_ROWS_PER_SEGMENT = 1000;
-
-    final int OPERATIONS_PER_ITERATION = 10;
-    final int ITERATION_COUNT = 5;
-
-    final double UPLOAD_PROBABILITY = 0.8d;
-
-    final String[] segmentNames = new String[SEGMENT_COUNT];
-    final int[] segmentRowCounts = new int[SEGMENT_COUNT];
-
-    for (int i = 0; i < SEGMENT_COUNT; i++) {
-      segmentNames[i] = "segment_" + i;
-      segmentRowCounts[i] = 0;
-    }
-
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      // Create THREAD_COUNT threads
-      ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
-
-      // Submit OPERATIONS_PER_ITERATION uploads/deletes
-      for (int j = 0; j < OPERATIONS_PER_ITERATION; j++) {
-        executorService.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              ThreadLocalRandom random = ThreadLocalRandom.current();
-
-              // Pick a random segment
-              int segmentIndex = random.nextInt(SEGMENT_COUNT);
-              String segmentName = segmentNames[segmentIndex];
-
-              // Pick a random operation
-              if (random.nextDouble() < UPLOAD_PROBABILITY) {
-                // Upload this segment
-                LOGGER.info("Will upload segment {}", segmentName);
-
-                synchronized (segmentName) {
-                  // Create a segment with a random number of rows
-                  int segmentRowCount = random.nextInt(MIN_ROWS_PER_SEGMENT, MAX_ROWS_PER_SEGMENT);
-                  LOGGER.info("Generating and uploading segment {} with {} rows", segmentName, segmentRowCount);
-                  generateAndUploadRandomSegment(segmentName, segmentRowCount);
-
-                  // Store the number of rows
-                  LOGGER.info("Uploaded segment {} with {} rows", segmentName, segmentRowCount);
-                  segmentRowCounts[segmentIndex] = segmentRowCount;
-                }
-              } else {
-                // Delete this segment
-                LOGGER.info("Will delete segment {}", segmentName);
-
-                synchronized (segmentName) {
-                  // Delete this segment
-                  LOGGER.info("Deleting segment {}", segmentName);
-                  String reply = sendDeleteRequest(_controllerRequestURLBuilder.
-                      forSegmentDelete("myresource", segmentName));
-                  LOGGER.info("Deletion returned {}", reply);
-
-                  // Set the number of rows to zero
-                  LOGGER.info("Deleted segment {}", segmentName);
-                  segmentRowCounts[segmentIndex] = 0;
-                }
-              }
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          }
-        });
-      }
-
-      // Await for all tasks to complete
-      executorService.shutdown();
-      executorService.awaitTermination(5L, TimeUnit.MINUTES);
-
-      // Count number of expected rows
-      int expectedRowCount = 0;
-      for (int segmentRowCount : segmentRowCounts) {
-        expectedRowCount += segmentRowCount;
-      }
-
-      // Wait for up to one minute for the row count to match the expected row count
-      LOGGER.info("Awaiting for the row count to match {}", expectedRowCount);
-      int pinotRowCount = (int) getCurrentCountStarResult();
-      long timeInOneMinute = System.currentTimeMillis() + 60 * 1000L;
-      while (System.currentTimeMillis() < timeInOneMinute && pinotRowCount != expectedRowCount) {
-        LOGGER.info("Row count is {}, expected {}, awaiting for row count to match", pinotRowCount, expectedRowCount);
-        Thread.sleep(5000L);
-
-        try {
-          pinotRowCount = (int) getCurrentCountStarResult();
-        } catch (Exception e) {
-          LOGGER.warn("Caught exception while sending query to Pinot, retrying", e);
-        }
-      }
-
-      // Compare row counts
-      Assert.assertEquals(pinotRowCount, expectedRowCount,
-          "Expected and actual row counts don't match after waiting one minute");
-    }
-  }
-
-  @AfterClass
-  public void tearDown() {
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    FileUtils.deleteQuietly(_tempDir);
-  }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java
deleted file mode 100644
index 2a3e88f..0000000
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/server/util/SegmentTestUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-
-
-public class SegmentTestUtils {
-  private SegmentTestUtils() {
-  }
-
-  @Nonnull
-  public static SegmentGeneratorConfig getSegmentGeneratorConfig(@Nonnull File inputAvro, @Nonnull File outputDir,
-      @Nonnull TimeUnit timeUnit, @Nonnull String tableName, @Nullable Schema pinotSchema)
-      throws IOException {
-    if (pinotSchema == null) {
-      pinotSchema = AvroUtils.getPinotSchemaFromAvroDataFile(inputAvro);
-    }
-    TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName);
-    // TODO: change to getDateTimeFieldSpecs().get(0)
-    if (pinotSchema.getTimeFieldSpec() != null) {
-      tableConfigBuilder.setTimeColumnName(pinotSchema.getTimeFieldSpec().getName());
-    }
-    TableConfig tableConfig = tableConfigBuilder.build();
-    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, pinotSchema);
-
-    segmentGeneratorConfig.setInputFilePath(inputAvro.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentTimeUnit(timeUnit);
-    if (inputAvro.getName().endsWith("gz")) {
-      segmentGeneratorConfig.setFormat(FileFormat.GZIPPED_AVRO);
-    } else {
-      segmentGeneratorConfig.setFormat(FileFormat.AVRO);
-    }
-    segmentGeneratorConfig.setSegmentVersion(SegmentVersion.v1);
-    segmentGeneratorConfig.setTableName(tableName);
-    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
-
-    return segmentGeneratorConfig;
-  }
-}
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74ffc21..46fa2d9 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -94,7 +94,6 @@
     <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.openjdk.jmh</groupId>
@@ -174,10 +173,6 @@
               <name>pinot-RawIndexBenchmark</name>
             </program>
             <program>
-              <mainClass>org.apache.pinot.perf.RealtimeStressTest</mainClass>
-              <name>pinot-RealtimeStressTest</name>
-            </program>
-            <program>
               <mainClass>org.apache.pinot.perf.StringDictionaryPerfTest</mainClass>
               <name>pinot-StringDictionaryPerfTest</name>
             </program>
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 77e781f..e3196cd 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -19,16 +19,10 @@
 package org.apache.pinot.perf;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 
@@ -37,13 +31,9 @@ import org.apache.pinot.util.TestUtils;
  * Benchmark that writes a configurable amount of rows in Kafka and checks how much time it takes to consume all of
  * them.
  */
-public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegrationTest {
+public class BenchmarkRealtimeConsumptionSpeed extends BaseClusterIntegrationTest {
   private static final int ROW_COUNT = 100_000;
-  private static final int ROW_COUNT_FOR_SEGMENT_FLUSH = 10_000;
   private static final long TIMEOUT_MILLIS = 20 * 60 * 1000L; // Twenty minutes
-  private final File _tmpDir = new File("/tmp/" + getHelixClusterName());
-  private static final int SEGMENT_COUNT = 1;
-  private static final Random RANDOM = new Random(123456L);
 
   public static void main(String[] args) {
     try {
@@ -56,50 +46,34 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
 
   private void runBenchmark()
       throws Exception {
-    // Start ZK and Kafka
-    startZk();
-    StreamDataServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    // Create Kafka topic
-    kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(10));
-
-    // Unpack data (needed to get the Avro schema)
-    TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
-        RealtimeClusterIntegrationTest.class.getClassLoader()
-            .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz"))), _tmpDir);
-
-    _tmpDir.mkdirs();
-    final List<File> avroFiles = new ArrayList<File>(SEGMENT_COUNT);
-    for (int segmentNumber = 1; segmentNumber <= SEGMENT_COUNT; ++segmentNumber) {
-      avroFiles.add(new File(_tmpDir.getPath() + "/On_Time_On_Time_Performance_2014_" + segmentNumber + ".avro"));
-    }
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
 
     // Start the Pinot cluster
+    startZk();
     startController();
     startBroker();
     startServer();
 
-    // Create realtime table
-    setUpRealtimeTable(avroFiles.get(0));
+    // Start Kafka
+    startKafka();
 
-    // Wait a couple of seconds for all Helix state transitions to happen
-    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+    // Unpack the Avro files
+    File avroFile = unpackAvroData(_tempDir).get(0);
+
+    // Create and upload the schema and table config
+    addSchema(createSchema());
+    addTableConfig(createRealtimeTableConfig(avroFile));
 
     // Generate ROW_COUNT rows and write them into Kafka
-    new Thread() {
-      @Override
-      public void run() {
-        try {
-          ClusterIntegrationTestUtils
-              .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
-                  ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
-        } catch (Exception e) {
-          // Ignored
-        }
+    new Thread(() -> {
+      try {
+        ClusterIntegrationTestUtils
+            .pushRandomAvroIntoKafka(avroFile, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
+                getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
+      } catch (Exception e) {
+        // Ignored
       }
-    }.start();
+    }).start();
 
     // Count how many seconds it takes for select count(*) to match with ROW_COUNT
     long startTime = System.currentTimeMillis();
@@ -127,5 +101,6 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
     long endTime = System.currentTimeMillis();
 
     System.out.println("Consumed " + ROW_COUNT + " rows in " + (endTime - startTime) / 1000.0 + " seconds");
+    FileUtils.deleteDirectory(_tempDir);
   }
 }
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
deleted file mode 100644
index 7d239df..0000000
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.perf;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.spi.stream.StreamDataServerStartable;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
-import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
-
-
-/**
- * Stress test that writes an infinite amount of data in Kafka for a given duration until Pinot breaks.
- */
-public class RealtimeStressTest extends RealtimeClusterIntegrationTest {
-  private static final int ROW_COUNT = 100_000;
-  private static final int MIN_ROW_COUNT = 100_000;
-  private static final int ROW_COUNT_FOR_SEGMENT_FLUSH = 10_000;
-  private static final long TIMEOUT_MILLIS = 20 * 60 * 1000L; // Twenty minutes
-  private final File _tmpDir = new File("/tmp/" + getHelixClusterName());
-  private static final int SEGMENT_COUNT = 1;
-  private static final Random RANDOM = new Random(123456L);
-  private static long rowsWritten = 0L;
-
-  public static void main(String[] args) {
-    try {
-      new RealtimeStressTest().runBenchmark();
-    } catch (Exception e) {
-      System.exit(-1);
-    }
-  }
-
-  private void runBenchmark()
-      throws Exception {
-    // Start ZK and Kafka
-    startZk();
-    StreamDataServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    // Create Kafka topic
-    kafkaStarter.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(10));
-
-    // Unpack data (needed to get the Avro schema)
-    TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
-        RealtimeClusterIntegrationTest.class.getClassLoader()
-            .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.tar.gz"))), _tmpDir);
-
-    _tmpDir.mkdirs();
-    final List<File> avroFiles = new ArrayList<File>(SEGMENT_COUNT);
-    for (int segmentNumber = 1; segmentNumber <= SEGMENT_COUNT; ++segmentNumber) {
-      avroFiles.add(new File(_tmpDir.getPath() + "/On_Time_On_Time_Performance_2014_" + segmentNumber + ".avro"));
-    }
-
-    File schemaFile = new File(OfflineClusterIntegrationTest.class.getClassLoader()
-        .getResource("On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema").getFile());
-
-    // Start the Pinot cluster
-    startController();
-    startBroker();
-    startServer();
-
-    // Create realtime table
-    setUpRealtimeTable(avroFiles.get(0));
-
-    // Wait a couple of seconds for all Helix state transitions to happen
-    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
-
-    // Generate ROW_COUNT rows and write them into Kafka
-    ClusterIntegrationTestUtils
-        .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
-            getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
-    rowsWritten += ROW_COUNT;
-
-    // Run forever until something breaks or the timeout completes
-    long pinotRecordCount = -1L;
-    long timeAfterTimeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
-    do {
-      Thread.sleep(500L);
-
-      // Run the query
-      try {
-        JsonNode response = postQuery("select count(*) from mytable");
-        pinotRecordCount = response.get("aggregationResults").get(0).get("value").asLong();
-      } catch (Exception e) {
-        // Ignore
-        continue;
-      }
-
-      // Write more rows if needed
-      if (rowsWritten - pinotRecordCount < MIN_ROW_COUNT) {
-        ClusterIntegrationTestUtils
-            .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
-                ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
-        rowsWritten += ROW_COUNT;
-      }
-
-      System.out.println("Pinot record count: " + pinotRecordCount);
-      if (timeAfterTimeout < System.currentTimeMillis()) {
-        throw new RuntimeException("Timeout exceeded!");
-      }
-    } while (true);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org