You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/13 01:12:51 UTC

[incubator-pinot] 01/01: Add config to set batchMessageMode on ideal state of new tables

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch batch_update_mode
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b15bac732121a394ebe37a84b1f3d30827cc9e8f
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Dec 12 17:12:31 2018 -0800

    Add config to set batchMessageMode on ideal state of new tables
---
 .../pinot/broker/broker/HelixBrokerStarterTest.java  |  4 +---
 .../linkedin/pinot/controller/ControllerConf.java    |  6 ++++++
 .../helix/core/PinotHelixResourceManager.java        | 18 +++++++++++-------
 .../helix/core/PinotTableIdealStateBuilder.java      | 20 +++++++++++++-------
 .../controller/helix/core/util/HelixSetupUtils.java  | 11 ++++++-----
 .../controller/helix/PinotResourceManagerTest.java   |  2 +-
 .../realtime/PinotLLCRealtimeSegmentManagerTest.java |  6 +++---
 .../helix/core/retention/RetentionManagerTest.java   |  2 +-
 .../core/sharding/SegmentAssignmentStrategyTest.java |  4 ++--
 .../controller/validation/ValidationManagerTest.java |  4 ++--
 10 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/broker/HelixBrokerStarterTest.java
index 359af1c..38f26e5 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -21,10 +21,8 @@ import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
 import com.linkedin.pinot.broker.routing.HelixExternalViewBasedRouting;
 import com.linkedin.pinot.broker.routing.TimeBoundaryService;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
-import com.linkedin.pinot.common.config.IndexingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
-import com.linkedin.pinot.common.data.FieldSpec;
 import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
@@ -75,7 +73,7 @@ public class HelixBrokerStarterTest {
     _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
     final String instanceId = "localhost_helixController";
     _pinotResourceManager =
-        new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true, /*isUpdateStateModel=*/false);
+        new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true, /*isUpdateStateModel=*/false, true);
     _pinotResourceManager.start();
     _helixAdmin = _pinotResourceManager.getHelixAdmin();
 
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index f7214bd..b391765 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -74,6 +74,7 @@ public class ControllerConf extends PropertiesConfiguration {
   // separate interval
   private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
       "controller.segment.level.validation.intervalInSeconds";
+  private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
@@ -96,6 +97,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
+  private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
   private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
@@ -459,6 +461,10 @@ public class ControllerConf extends PropertiesConfiguration {
     return getBoolean(ENABLE_STORAGE_QUOTA_CHECK, DEFAULT_ENABLE_STORAGE_QUOTA_CHECK);
   }
 
+  public boolean getEnableBatchMessageMode() {
+    return getBoolean(ENABLE_BATCH_MESSAGE_MODE, DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
+  }
+
   public int getSegmentLevelValidationIntervalInSeconds() {
     return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..971c908 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -122,6 +122,7 @@ public class PinotHelixResourceManager {
   private final long _externalViewOnlineToOfflineTimeoutMillis;
   private final boolean _isSingleTenantCluster;
   private final boolean _isUpdateStateModel;
+  private final boolean _enableBatchMessageMode;
 
   private HelixManager _helixZkManager;
   private HelixAdmin _helixAdmin;
@@ -133,7 +134,7 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
-      boolean isSingleTenantCluster, boolean isUpdateStateModel) {
+      boolean isSingleTenantCluster, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _instanceId = controllerInstanceId;
@@ -141,26 +142,28 @@ public class PinotHelixResourceManager {
     _externalViewOnlineToOfflineTimeoutMillis = externalViewOnlineToOfflineTimeoutMillis;
     _isSingleTenantCluster = isSingleTenantCluster;
     _isUpdateStateModel = isUpdateStateModel;
+    _enableBatchMessageMode = enableBatchMessageMode;
   }
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
     this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS,
-        false, false);
+        false, false, true);
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
         controllerConf.getControllerHost() + "_" + controllerConf.getControllerPort(), controllerConf.getDataDir(),
         controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
-        controllerConf.isUpdateSegmentStateModel());
+        controllerConf.isUpdateSegmentStateModel(), controllerConf.getEnableBatchMessageMode());
   }
 
   /**
    * Create Helix cluster if needed, and then start a Pinot controller instance.
    */
   public synchronized void start() {
-    _helixZkManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel);
+    _helixZkManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel,
+        _enableBatchMessageMode);
     Preconditions.checkNotNull(_helixZkManager);
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -1072,7 +1075,7 @@ public class PinotHelixResourceManager {
         // now lets build an ideal state
         LOGGER.info("building empty ideal state for table : " + tableNameWithType);
         final IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
-            Integer.parseInt(segmentsConfig.getReplication()));
+            Integer.parseInt(segmentsConfig.getReplication()), _enableBatchMessageMode);
         LOGGER.info("adding table via the admin");
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, offlineIdealState);
         LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster");
@@ -1208,7 +1211,8 @@ public class PinotHelixResourceManager {
       // Will either create idealstate entry, or update the IS entry with new segments
       // (unless there are low-level segments already present)
       if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
-        PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState);
+        PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState,
+            _enableBatchMessageMode);
         LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
       } else {
         LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
@@ -1220,7 +1224,7 @@ public class PinotHelixResourceManager {
       IdealState idealState) {
     if (idealState == null) {
       idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config,
-          _helixZkManager, _propertyStore);
+          _helixZkManager, _propertyStore, _enableBatchMessageMode);
       LOGGER.info("Adding helix resource with empty HLC IdealState for {}", realtimeTableName);
       _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
     } else {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 5a7b09e..6c98b54 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -58,7 +58,7 @@ public class PinotTableIdealStateBuilder {
    * @param numCopies is the number of replicas
    * @return
    */
-  public static IdealState buildEmptyIdealStateFor(String tableName, int numCopies) {
+  public static IdealState buildEmptyIdealStateFor(String tableName, int numCopies, boolean enableBatchMessageMode) {
     final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableName);
     final int replicas = numCopies;
     customModeIdealStateBuilder
@@ -66,6 +66,7 @@ public class PinotTableIdealStateBuilder {
         .setNumPartitions(0).setNumReplica(replicas).setMaxPartitionsPerNode(1);
     final IdealState idealState = customModeIdealStateBuilder.build();
     idealState.setInstanceGroupTag(tableName);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
     return idealState;
   }
 
@@ -78,7 +79,8 @@ public class PinotTableIdealStateBuilder {
    * @param helixClusterName
    * @return
    */
-  public static IdealState buildEmptyIdealStateForBrokerResource(HelixAdmin helixAdmin, String helixClusterName) {
+  public static IdealState buildEmptyIdealStateForBrokerResource(HelixAdmin helixAdmin, String helixClusterName,
+      boolean enableBatchMessageMode) {
     final CustomModeISBuilder customModeIdealStateBuilder =
         new CustomModeISBuilder(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     customModeIdealStateBuilder
@@ -87,6 +89,7 @@ public class PinotTableIdealStateBuilder {
         .setMaxPartitionsPerNode(Integer.MAX_VALUE).setNumReplica(Integer.MAX_VALUE)
         .setNumPartitions(Integer.MAX_VALUE);
     final IdealState idealState = customModeIdealStateBuilder.build();
+    idealState.setBatchMessageMode(enableBatchMessageMode);
     return idealState;
   }
 
@@ -97,11 +100,12 @@ public class PinotTableIdealStateBuilder {
   }
 
   public static IdealState buildInitialHighLevelRealtimeIdealStateFor(String realtimeTableName,
-      TableConfig realtimeTableConfig, HelixManager helixManager, ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore) {
+      TableConfig realtimeTableConfig, HelixManager helixManager, ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore,
+      boolean enableBatchMessageMode) {
     RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(realtimeTableConfig);
     final List<String> realtimeInstances =
         HelixHelper.getInstancesWithTag(helixManager, realtimeTagConfig.getConsumingServerTag());
-    IdealState idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, 1);
+    IdealState idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, 1, enableBatchMessageMode);
     if (realtimeInstances.size() % Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()) != 0) {
       throw new RuntimeException(
           "Number of instance in current tenant should be an integer multiples of the number of replications");
@@ -113,7 +117,7 @@ public class PinotTableIdealStateBuilder {
   }
 
   public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
-      IdealState idealState) {
+      IdealState idealState, boolean enableBatchMessageMode) {
 
     // Validate replicasPerPartition here.
     final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -128,7 +132,7 @@ public class PinotTableIdealStateBuilder {
           "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e);
     }
     if (idealState == null) {
-      idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas);
+      idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
     }
     final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance();
     try {
@@ -150,13 +154,15 @@ public class PinotTableIdealStateBuilder {
     }
   }
 
-  public static IdealState buildEmptyRealtimeIdealStateFor(String realtimeTableName, int replicaCount) {
+  public static IdealState buildEmptyRealtimeIdealStateFor(String realtimeTableName, int replicaCount,
+      boolean enableBatchMessageMode) {
     final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(realtimeTableName);
     customModeIdealStateBuilder
         .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
         .setNumPartitions(0).setNumReplica(replicaCount).setMaxPartitionsPerNode(1);
     final IdealState idealState = customModeIdealStateBuilder.build();
     idealState.setInstanceGroupTag(realtimeTableName);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
 
     return idealState;
   }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/util/HelixSetupUtils.java
index 6f18b22..2142f60 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -61,10 +61,10 @@ public class HelixSetupUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class);
 
   public static synchronized HelixManager setup(String helixClusterName, String zkPath, String pinotControllerInstanceId,
-      boolean isUpdateStateModel) {
+      boolean isUpdateStateModel, boolean enableBatchMessageMode) {
 
     try {
-      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel);
+      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode);
     } catch (final Exception e) {
       LOGGER.error("Caught exception", e);
       return null;
@@ -82,7 +82,8 @@ public class HelixSetupUtils {
     createHelixClusterIfNeeded(helixClusterName, zkPath);
   }
 
-  public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel) {
+  public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel,
+      boolean enableBatchMessageMode) {
     final HelixAdmin admin = new ZKHelixAdmin(zkPath);
     final String segmentStateModelName = PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
 
@@ -146,8 +147,8 @@ public class HelixSetupUtils {
     LOGGER.info("Adding empty ideal state for Broker!");
     HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
         helixClusterName, admin);
-    IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyIdealStateForBrokerResource(admin, helixClusterName);
+    IdealState idealState = PinotTableIdealStateBuilder.buildEmptyIdealStateForBrokerResource(admin, helixClusterName,
+        enableBatchMessageMode);
     admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
     initPropertyStorePath(helixClusterName, zkPath);
     LOGGER.info("New Cluster setup completed... ********************************************** ");
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
index 2cd4f0f..e3c59d4 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
@@ -52,7 +52,7 @@ public class PinotResourceManagerTest {
     final String instanceId = "localhost_helixController";
     _pinotHelixResourceManager =
         new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true,
-            /*isUpdateStateModel=*/ false);
+            /*isUpdateStateModel=*/ false, true);
     _pinotHelixResourceManager.start();
     _helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index a5653a8..43ade02 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -857,7 +857,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     TableConfig tableConfig = makeTableConfig(rtTableName, 3, DUMMY_HOST, DEFAULT_SERVER_TENANT);
     segmentManager.addTableToStore(rtTableName, tableConfig, 8);
-    IdealState idealState = PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, 10);
+    IdealState idealState = PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, 10, true);
     try {
       segmentManager.setupNewTable(tableConfig, idealState);
       Assert.fail("Did not get expected exception when setting up new table with existing segments in ");
@@ -884,7 +884,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager.addTableToStore(tableName, tableConfig, nPartitions);
 
     IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(tableName, nReplicas);
+        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(tableName, nReplicas, true);
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
     segmentManager.setupNewTable(tableConfig, idealState);
     // Now commit the first segment of partition 6.
@@ -1113,7 +1113,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     TableConfig tableConfig =
         makeTableConfig(rtTableName, nReplicas, DUMMY_HOST, DEFAULT_SERVER_TENANT);
     IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, nReplicas);
+        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, nReplicas, true);
 
     segmentManager.addTableToStore(rtTableName, tableConfig, nPartitions);
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 1baf36f..1da115e 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -222,7 +222,7 @@ public class RetentionManagerTest {
     List<RealtimeSegmentZKMetadata> allSegments = new ArrayList<>();
 
     IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(REALTIME_TABLE_NAME, replicaCount);
+        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 748c02a..326932f 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -76,11 +76,11 @@ public class SegmentAssignmentStrategyTest {
     final String instanceId = "localhost_helixController";
     _pinotHelixResourceManager =
         new PinotHelixResourceManager(ZK_SERVER, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true, /*isUpdateStateModel=*/
-            false);
+            false, true);
     _pinotHelixResourceManager.start();
 
     final String helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(ZK_SERVER);
-    _helixZkManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, helixZkURL, instanceId, /*isUpdateStateModel=*/false);
+    _helixZkManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, helixZkURL, instanceId, /*isUpdateStateModel=*/false, true);
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _partitionAssignmentGenerator =
         new ReplicaGroupPartitionAssignmentGenerator(_helixZkManager.getHelixPropertyStore());
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 939abf2..6d5394a 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -76,7 +76,7 @@ public class ValidationManagerTest {
     _pinotHelixResourceManager =
         new PinotHelixResourceManager(ZK_STR, HELIX_CLUSTER_NAME, CONTROLLER_INSTANCE_NAME, null, 1000L,
             true, /*isUpdateStateModel=*/
-            false);
+            false, true);
     _pinotHelixResourceManager.start();
 
     ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_STR, 2, true);
@@ -87,7 +87,7 @@ public class ValidationManagerTest {
         .build();
 
     final String instanceId = "localhost_helixController";
-    _helixManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, ZK_STR, instanceId, /*isUpdateStateModel=*/false);
+    _helixManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, ZK_STR, instanceId, /*isUpdateStateModel=*/false, true);
     _pinotHelixResourceManager.addTable(_offlineTableConfig);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org