You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/03/09 05:25:43 UTC

[pinot] branch master updated: Allow replica group assignment support in tier configs (#10255)

This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3c8e8705 Allow replica group assignment support in tier configs (#10255)
7c3c8e8705 is described below

commit 7c3c8e87052edca26648a646850b2b17ec0aa690
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Thu Mar 9 10:55:35 2023 +0530

    Allow replica group assignment support in tier configs (#10255)
    
    * Allow replica group assignment support in tier configs
    
    * Add reassignment logic
    
    * Add tier to assignInstances
    
    * Make changes to APIs
    
    * Consolidate tier assignemnt configs inside InstanceAssignmentConfigsMap
    
    * Add removal logic for tier partitions
    
    * Lint fix
    
    * Review comments
    
    * Review comments
    
    * Add tests
    
    * Fix java8 quickstart
    
    * Fix test
    
    ---------
    
    Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
 .../assignment/InstanceAssignmentConfigUtils.java  |  20 +--
 .../common/assignment/InstancePartitionsUtils.java |  16 ++
 .../pinot/common/metadata/ZKMetadataProvider.java  |  16 ++
 .../common/utils/config/TableConfigUtils.java      |  13 +-
 .../common/utils/config/TableConfigSerDeTest.java  |  10 +-
 .../PinotInstanceAssignmentRestletResource.java    | 188 +++++++++++++++------
 .../api/resources/PinotTableRestletResource.java   |   2 +-
 .../helix/core/PinotHelixResourceManager.java      |  32 +++-
 .../instance/InstanceAssignmentDriver.java         |  32 +++-
 .../helix/core/rebalance/RebalanceResult.java      |   8 +
 .../helix/core/rebalance/TableRebalancer.java      | 111 ++++++++----
 ...anceAssignmentRestletResourceStatelessTest.java | 141 ++++++++++------
 .../instance/InstanceAssignmentTest.java           |  64 +++----
 .../TableRebalancerClusterStatelessTest.java       | 127 +++++++++++++-
 .../segment/local/utils/TableConfigUtils.java      |   3 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |   8 +-
 .../apache/pinot/spi/config/table/TableConfig.java |   8 +-
 .../utils/builder/ControllerRequestURLBuilder.java |   2 +-
 .../spi/utils/builder/TableConfigBuilder.java      |   4 +-
 19 files changed, 594 insertions(+), 211 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 6a0ae1188e..b571918c0c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -44,11 +44,10 @@ public class InstanceAssignmentConfigUtils {
    * backward-compatibility) COMPLETED server tag is overridden to be different from the CONSUMING server tag.
    */
   public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
     return (instanceAssignmentConfigMap != null
-        && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED) != null) || TagNameUtils
-        .isRelocateCompletedSegments(tableConfig.getTenantConfig());
+        && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) != null)
+        || TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
   /**
@@ -60,21 +59,20 @@ public class InstanceAssignmentConfigUtils {
       return true;
     }
     TableType tableType = tableConfig.getTableType();
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
     switch (instancePartitionsType) {
       // Allow OFFLINE instance assignment if the offline table has it configured or (for backward-compatibility) is
       // using replica-group segment assignment
       case OFFLINE:
         return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null
-            && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE) != null)
+            && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) != null)
             || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
             .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
       // Allow CONSUMING/COMPLETED instance assignment if the real-time table has it configured
       case CONSUMING:
       case COMPLETED:
         return tableType == TableType.REALTIME && (instanceAssignmentConfigMap != null
-            && instanceAssignmentConfigMap.get(instancePartitionsType) != null);
+            && instanceAssignmentConfigMap.get(instancePartitionsType.toString()) != null);
       default:
         throw new IllegalStateException();
     }
@@ -89,10 +87,10 @@ public class InstanceAssignmentConfigUtils {
         "Instance assignment is not allowed for the given table config");
 
     // Use the instance assignment config from the table config if it exists
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
     if (instanceAssignmentConfigMap != null) {
-      InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(instancePartitionsType);
+      InstanceAssignmentConfig instanceAssignmentConfig =
+          instanceAssignmentConfigMap.get(instancePartitionsType.toString());
       if (instanceAssignmentConfig != null) {
         return instanceAssignmentConfig;
       }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index a15554f3d3..759d387af4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -46,6 +46,7 @@ public class InstancePartitionsUtils {
   }
 
   public static final char TYPE_SUFFIX_SEPARATOR = '_';
+  public static final String TIER_SUFFIX = "__TIER__";
 
   /**
    * Returns the name of the instance partitions for the given table name (with or without type suffix) and instance
@@ -93,6 +94,11 @@ public class InstancePartitionsUtils {
     return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
   }
 
+  public static String getInstancePartitionsNameForTier(String tableName, String tierName) {
+    return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + tierName;
+  }
+
+
   /**
    * Gets the instance partitions with the given name, and returns a re-named copy of the same.
    * This method is useful when we use a table with instancePartitionsMap since in that case
@@ -177,4 +183,14 @@ public class InstancePartitionsUtils {
       throw new ZkException("Failed to remove instance partitions: " + instancePartitionsName);
     }
   }
+
+  public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    List<InstancePartitions> instancePartitions = ZKMetadataProvider.getAllInstancePartitions(propertyStore);
+    instancePartitions.stream().filter(instancePartition -> instancePartition.getInstancePartitionsName()
+            .startsWith(TableNameBuilder.extractRawTableName(tableNameWithType) + TIER_SUFFIX))
+        .forEach(instancePartition -> {
+          removeInstancePartitions(propertyStore, instancePartition.getInstancePartitionsName());
+        });
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 623a856454..b14ba30391 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -26,9 +26,11 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
+import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.SchemaUtils;
@@ -261,6 +263,20 @@ public class ZKMetadataProvider {
     }
   }
 
+  @Nullable
+  public static List<InstancePartitions> getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+    List<ZNRecord> znRecordss =
+        propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, null, AccessOption.PERSISTENT);
+
+    try {
+      return Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting instance partitions", e);
+      return null;
+    }
+  }
+
   @Nullable
   public static List<UserConfig> getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
     List<ZNRecord> znRecordss =
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 8abb0ea964..9af13f44bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -113,11 +113,11 @@ public class TableConfigUtils {
       queryConfig = JsonUtils.stringToObject(queryConfigString, QueryConfig.class);
     }
 
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
     String instanceAssignmentConfigMapString = simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY);
     if (instanceAssignmentConfigMapString != null) {
       instanceAssignmentConfigMap = JsonUtils.stringToObject(instanceAssignmentConfigMapString,
-          new TypeReference<Map<InstancePartitionsType, InstanceAssignmentConfig>>() {
+          new TypeReference<Map<String, InstanceAssignmentConfig>>() {
           });
     }
 
@@ -181,9 +181,9 @@ public class TableConfigUtils {
     }
 
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap,
-        fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable,
-        tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap);
+        quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+        dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList,
+        instancePartitionsMap, segmentAssignmentConfigMap);
   }
 
   public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -216,8 +216,7 @@ public class TableConfigUtils {
     if (queryConfig != null) {
       simpleFields.put(TableConfig.QUERY_CONFIG_KEY, queryConfig.toJsonString());
     }
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
     if (instanceAssignmentConfigMap != null) {
       simpleFields
           .put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap));
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index aad62db8f7..30dbfe80b4 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
               new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
               new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false));
       TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
-          Collections.singletonMap(InstancePartitionsType.OFFLINE, instanceAssignmentConfig)).build();
+          Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
 
       checkInstanceAssignmentConfig(tableConfig);
 
@@ -488,12 +488,12 @@ public class TableConfigSerDeTest {
   }
 
   private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
-    Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
     assertNotNull(instanceAssignmentConfigMap);
     assertEquals(instanceAssignmentConfigMap.size(), 1);
-    assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
-    InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
+    assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
 
     InstanceTagPoolConfig tagPoolConfig = instanceAssignmentConfig.getTagPoolConfig();
     assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index dfdaefafaa..aee7df4e8a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +46,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -57,6 +59,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -80,39 +83,57 @@ public class PinotInstanceAssignmentRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/{tableName}/instancePartitions")
   @ApiOperation(value = "Get the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+  public Map<String, InstancePartitions> getInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
-          InstancePartitionsType instancePartitionsType) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable String type) {
+    Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
 
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType != TableType.REALTIME) {
-      if (instancePartitionsType == InstancePartitionsType.OFFLINE || instancePartitionsType == null) {
-        InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null) {
+        InstancePartitions offlineInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
         if (offlineInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.OFFLINE, offlineInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstancePartitions);
         }
       }
     }
     if (tableType != TableType.OFFLINE) {
-      if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
-        InstancePartitions consumingInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) {
+        InstancePartitions consumingInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
         if (consumingInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), consumingInstancePartitions);
         }
       }
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
-        InstancePartitions completedInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) {
+        InstancePartitions completedInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
         if (completedInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+        }
+      }
+    }
+
+    List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if (type == null || type.equals(tierConfig.getName())) {
+            InstancePartitions instancePartitions =
+                InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                    InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                        tierConfig.getName()));
+            if (instancePartitions != null) {
+              instancePartitionsMap.put(tierConfig.getName(), instancePartitions);
+            }
+          }
         }
       }
     }
@@ -130,22 +151,20 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/assignInstances")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Assign server instances to a table")
-  public Map<InstancePartitionsType, InstancePartitions> assignInstances(
+  public Map<String, InstancePartitions> assignInstances(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
-          InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable String type,
       @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
+    Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     List<InstanceConfig> instanceConfigs = _resourceManager.getAllHelixInstanceConfigs();
 
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE
-        || instancePartitionsType == null)) {
+    if (tableType != TableType.REALTIME && (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null)) {
       TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName);
       if (offlineTableConfig != null) {
         try {
-          if (InstanceAssignmentConfigUtils
-              .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
+          if (InstanceAssignmentConfigUtils.allowInstanceAssignment(offlineTableConfig,
+              InstancePartitionsType.OFFLINE)) {
             assignInstancesForInstancePartitionsType(instancePartitionsMap, offlineTableConfig, instanceConfigs,
                 InstancePartitionsType.OFFLINE);
           }
@@ -158,20 +177,20 @@ public class PinotInstanceAssignmentRestletResource {
         }
       }
     }
-    if (tableType != TableType.OFFLINE && instancePartitionsType != InstancePartitionsType.OFFLINE) {
+    if (tableType != TableType.OFFLINE && !InstancePartitionsType.OFFLINE.toString().equals(type)) {
       TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
       if (realtimeTableConfig != null) {
         try {
-          if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
-            if (InstanceAssignmentConfigUtils
-                .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
+          if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) {
+            if (InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+                InstancePartitionsType.CONSUMING)) {
               assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
                   InstancePartitionsType.CONSUMING);
             }
           }
-          if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
-            if (InstanceAssignmentConfigUtils
-                .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
+          if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) {
+            if (InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+                InstancePartitionsType.COMPLETED)) {
               assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
                   InstancePartitionsType.COMPLETED);
             }
@@ -186,6 +205,16 @@ public class PinotInstanceAssignmentRestletResource {
       }
     }
 
+    TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
+    if (realtimeTableConfig != null) {
+      assignInstancesForTier(instancePartitionsMap, realtimeTableConfig, instanceConfigs, type);
+    }
+
+    TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName);
+    if (offlineTableConfig != null) {
+      assignInstancesForTier(instancePartitionsMap, offlineTableConfig, instanceConfigs, type);
+    }
+
     if (instancePartitionsMap.isEmpty()) {
       throw new ControllerApplicationException(LOGGER, "Failed to find the instance assignment config",
           Response.Status.NOT_FOUND);
@@ -207,22 +236,43 @@ public class PinotInstanceAssignmentRestletResource {
    * @param instanceConfigs list of instance configs
    * @param instancePartitionsType type of instancePartitions
    */
-  private void assignInstancesForInstancePartitionsType(
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
-      List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
+  private void assignInstancesForInstancePartitionsType(Map<String, InstancePartitions> instancePartitionsMap,
+      TableConfig tableConfig, List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
     if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
       String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
-      instancePartitionsMap.put(instancePartitionsType, InstancePartitionsUtils.fetchInstancePartitionsWithRename(
-          _resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
-          instancePartitionsType.getInstancePartitionsName(rawTableName)));
+      instancePartitionsMap.put(instancePartitionsType.toString(),
+          InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+              tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+              instancePartitionsType.getInstancePartitionsName(rawTableName)));
       return;
     }
     InstancePartitions existingInstancePartitions =
         InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
             InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
-    instancePartitionsMap.put(instancePartitionsType, new InstanceAssignmentDriver(tableConfig)
-        .assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions));
+    instancePartitionsMap.put(instancePartitionsType.toString(),
+        new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
+            existingInstancePartitions));
+  }
+
+  private void assignInstancesForTier(Map<String, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
+      List<InstanceConfig> instanceConfigs, String tierName) {
+    if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+        && tableConfig.getInstanceAssignmentConfigMap() != null) {
+      for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+        if ((tierConfig.getName().equals(tierName) || tierName == null)
+            && tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null) {
+          InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions(
+              _resourceManager.getHelixZkManager().getHelixPropertyStore(),
+              InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                  tierConfig.getName()));
+
+          instancePartitionsMap.put(tierConfig.getName(),
+              new InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(), instanceConfigs,
+                  existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())));
+        }
+      }
+    }
   }
 
   private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
@@ -240,7 +290,7 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/instancePartitions")
   @Authenticate(AccessType.UPDATE)
   @ApiOperation(value = "Create/update the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions(
+  public Map<String, InstancePartitions> setInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, String instancePartitionsStr) {
     InstancePartitions instancePartitions;
     try {
@@ -256,17 +306,32 @@ public class PinotInstanceAssignmentRestletResource {
     if (tableType != TableType.REALTIME) {
       if (InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instancePartitions);
       }
     }
     if (tableType != TableType.OFFLINE) {
       if (InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), instancePartitions);
       }
       if (InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.COMPLETED, instancePartitions);
+        return Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(), instancePartitions);
+      }
+    }
+
+    List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if (InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName())
+              .equals(instancePartitionsName)) {
+            persistInstancePartitionsHelper(instancePartitions);
+            return Collections.singletonMap(tierConfig.getName(), instancePartitions);
+          }
+        }
       }
     }
 
@@ -281,22 +346,39 @@ public class PinotInstanceAssignmentRestletResource {
   @ApiOperation(value = "Remove the instance partitions")
   public SuccessResponse removeInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
-          InstancePartitionsType instancePartitionsType) {
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable
+          String instancePartitionsType) {
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE
+    if (tableType != TableType.REALTIME && (InstancePartitionsType.OFFLINE.toString().equals(instancePartitionsType)
         || instancePartitionsType == null)) {
       removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
     }
     if (tableType != TableType.OFFLINE) {
-      if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
+      if (InstancePartitionsType.CONSUMING.toString().equals(instancePartitionsType)
+          || instancePartitionsType == null) {
         removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
       }
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
+      if (InstancePartitionsType.COMPLETED.toString().equals(instancePartitionsType)
+          || instancePartitionsType == null) {
         removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
       }
     }
+
+    List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if (instancePartitionsType == null || instancePartitionsType.equals(tierConfig.getName())) {
+            removeInstancePartitionsHelper(
+                InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                    tierConfig.getName()));
+          }
+        }
+      }
+    }
     return new SuccessResponse("Instance partitions removed");
   }
 
@@ -315,16 +397,16 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/replaceInstance")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Replace an instance in the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> replaceInstance(
+  public Map<String, InstancePartitions> replaceInstance(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
-          InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable
+          String type,
       @ApiParam(value = "Old instance to be replaced", required = true) @QueryParam("oldInstanceId")
           String oldInstanceId,
       @ApiParam(value = "New instance to replace with", required = true) @QueryParam("newInstanceId")
           String newInstanceId) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
-        getInstancePartitions(tableName, instancePartitionsType);
+    Map<String, InstancePartitions> instancePartitionsMap =
+        getInstancePartitions(tableName, type);
     Iterator<InstancePartitions> iterator = instancePartitionsMap.values().iterator();
     while (iterator.hasNext()) {
       InstancePartitions instancePartitions = iterator.next();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index cfa1cffe2d..e43b244893 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -683,7 +683,7 @@ public class PinotTableRestletResource {
           });
           return new RebalanceResult(RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
-              dryRunResult.getSegmentAssignment());
+              dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment());
         } else {
           // If dry-run failed or is no-op, return the dry-run result
           return dryRunResult;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 91dc57956f..48418f2bda 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -147,6 +147,7 @@ import org.apache.pinot.spi.config.table.TableStats;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
@@ -1734,10 +1735,10 @@ public class PinotHelixResourceManager {
       }
     }
 
+    InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
+    List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
     if (!instancePartitionsTypesToAssign.isEmpty()) {
       LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, tableNameWithType);
-      InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
-      List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
         boolean hasPreConfiguredInstancePartitions =
             TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType);
@@ -1757,6 +1758,26 @@ public class PinotHelixResourceManager {
         }
       }
     }
+
+    // Process and persist tier config instancePartitions
+    if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+        && tableConfig.getInstanceAssignmentConfigMap() != null) {
+      for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+        if (tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName())) {
+          if (override || InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+              InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tierConfig.getName()))
+              == null) {
+            LOGGER.info("Calculating instance partitions for tier: {}, table : {}", tierConfig.getName(),
+                tableNameWithType);
+            InstancePartitions instancePartitions =
+                instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
+                    tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+            LOGGER.info("Persisting instance partitions: {}", instancePartitions);
+            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+          }
+        }
+      }
+    }
   }
 
   public void updateUserConfig(UserConfig userConfig)
@@ -1909,6 +1930,10 @@ public class PinotHelixResourceManager {
     InstancePartitionsUtils.removeInstancePartitions(_propertyStore, offlineTableName);
     LOGGER.info("Deleting table {}: Removed instance partitions", offlineTableName);
 
+    // Remove tier instance partitions
+    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, offlineTableName);
+    LOGGER.info("Deleting table {}: Removed tier instance partitions", offlineTableName);
+
     // Remove segment lineage
     SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, offlineTableName);
     LOGGER.info("Deleting table {}: Removed segment lineage", offlineTableName);
@@ -1968,6 +1993,9 @@ public class PinotHelixResourceManager {
         InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
     LOGGER.info("Deleting table {}: Removed instance partitions", realtimeTableName);
 
+    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, rawTableName);
+    LOGGER.info("Deleting table {}: Removed tier instance partitions", realtimeTableName);
+
     // Remove segment lineage
     SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, realtimeTableName);
     LOGGER.info("Deleting table {}: Removed segment lineage", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index ba73f7bd6d..7a5c901029 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
@@ -55,15 +56,31 @@ public class InstanceAssignmentDriver {
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
-
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
+    return getInstancePartitions(
+        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+        assignmentConfig, instanceConfigs, existingInstancePartitions);
+  }
+
+  public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) {
+    return getInstancePartitions(
+        InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName),
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+  }
+
+  private InstancePartitions getInstancePartitions(String instancePartitionsName,
+      InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions) {
+    String tableNameWithType = _tableConfig.getTableName();
+    LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
+
     InstanceTagPoolSelector tagPoolSelector =
-        new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType);
+        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
 
-    InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig();
+    InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
     List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
     if (constraintConfig == null) {
       LOGGER.info("No instance constraint is configured, using default hash-based-rotate instance constraint");
@@ -75,10 +92,9 @@ public class InstanceAssignmentDriver {
     }
 
     InstancePartitionSelector instancePartitionSelector =
-        InstancePartitionSelectorFactory.getInstance(assignmentConfig.getPartitionSelector(),
-            assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
-    InstancePartitions instancePartitions = new InstancePartitions(
-        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+        InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
+            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
+    InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index a2d6d630d0..4234abc5d5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 public class RebalanceResult {
   private final Status _status;
   private final Map<InstancePartitionsType, InstancePartitions> _instanceAssignment;
+  private final Map<String, InstancePartitions> _tierInstanceAssignment;
   private final Map<String, Map<String, String>> _segmentAssignment;
   private final String _description;
 
@@ -38,10 +39,12 @@ public class RebalanceResult {
   public RebalanceResult(@JsonProperty(value = "status", required = true) Status status,
       @JsonProperty(value = "description", required = true) String description,
       @JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
+      @JsonProperty("tierInstanceAssignment") @Nullable Map<String, InstancePartitions> tierInstanceAssignment,
       @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment) {
     _status = status;
     _description = description;
     _instanceAssignment = instanceAssignment;
+    _tierInstanceAssignment = tierInstanceAssignment;
     _segmentAssignment = segmentAssignment;
   }
 
@@ -60,6 +63,11 @@ public class RebalanceResult {
     return _instanceAssignment;
   }
 
+  @JsonProperty
+  public Map<String, InstancePartitions> getTierInstanceAssignment() {
+    return _tierInstanceAssignment;
+  }
+
   @JsonProperty
   public Map<String, Map<String, String>> getSegmentAssignment() {
     return _segmentAssignment;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index b8257ed450..218193e210 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -163,13 +163,13 @@ public class TableRebalancer {
           IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType()) {
         LOGGER.warn("Cannot rebalance table: {} with high-level consumer, aborting the rebalance", tableNameWithType);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance table with high-level consumer",
-            null, null);
+            null, null, null);
       }
     } catch (Exception e) {
       LOGGER.warn("Caught exception while validating table config for table: {}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while validating table config: " + e,
-          null, null);
+          null, null, null);
     }
 
     // Fetch ideal state
@@ -181,16 +181,17 @@ public class TableRebalancer {
       LOGGER.warn("Caught exception while fetching IdealState for table: {}, aborting the rebalance", tableNameWithType,
           e);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while fetching IdealState: " + e,
-          null, null);
+          null, null, null);
     }
     if (currentIdealState == null) {
       LOGGER.warn("Cannot find the IdealState for table: {}, aborting the rebalance", tableNameWithType);
-      return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null);
+      return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null,
+          null);
     }
     if (!currentIdealState.isEnabled() && !downtime) {
       LOGGER.warn("Cannot rebalance disabled table: {} without downtime, aborting the rebalance", tableNameWithType);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance disabled table without downtime",
-          null, null);
+          null, null, null);
     }
 
     LOGGER.info("Fetching/computing instance partitions, reassigning instances if configured for table: {}",
@@ -205,13 +206,13 @@ public class TableRebalancer {
           "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED,
-          "Caught exception while fetching/calculating instance partitions: " + e, null, null);
+          "Caught exception while fetching/calculating instance partitions: " + e, null, null, null);
     }
 
     // Calculate instance partitions for tiers if configured
     List<Tier> sortedTiers = getSortedTiers(tableConfig);
     Map<String, InstancePartitions> tierToInstancePartitionsMap =
-        getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+        getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
 
     LOGGER.info("Calculating the target assignment for table: {}", tableNameWithType);
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -224,7 +225,8 @@ public class TableRebalancer {
       LOGGER.warn("Caught exception while calculating target assignment for table: {}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED,
-          "Caught exception while calculating target assignment: " + e, instancePartitionsMap, null);
+          "Caught exception while calculating target assignment: " + e, instancePartitionsMap,
+          tierToInstancePartitionsMap, null);
     }
 
     if (currentAssignment.equals(targetAssignment)) {
@@ -233,20 +235,21 @@ public class TableRebalancer {
         if (dryRun) {
           return new RebalanceResult(RebalanceResult.Status.DONE,
               "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap,
-              targetAssignment);
+              tierToInstancePartitionsMap, targetAssignment);
         } else {
           return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced",
-              instancePartitionsMap, targetAssignment);
+              instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
         }
       } else {
         return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap,
-            targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment);
       }
     }
 
     if (dryRun) {
       LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType);
-      return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment);
+      return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap,
+          tierToInstancePartitionsMap, targetAssignment);
     }
 
     if (downtime) {
@@ -267,12 +270,13 @@ public class TableRebalancer {
             System.currentTimeMillis() - startTimeMs);
         return new RebalanceResult(RebalanceResult.Status.DONE,
             "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not "
-                + "reach the target segment assignment yet)", instancePartitionsMap, targetAssignment);
+                + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap,
+            targetAssignment);
       } catch (Exception e) {
         LOGGER.warn("Caught exception while updating IdealState for table: {}, aborting the rebalance",
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e,
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
       }
     }
 
@@ -295,7 +299,7 @@ public class TableRebalancer {
                 + "replicas: {}, aborting the rebalance", minReplicasToKeepUpForNoDowntime, tableNameWithType,
             numReplicas);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config",
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
       }
       minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
     } else {
@@ -321,7 +325,7 @@ public class TableRebalancer {
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED,
             "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
-            targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment);
       }
 
       // Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge
@@ -353,7 +357,8 @@ public class TableRebalancer {
           try {
             // Re-calculate the instance partitions in case the instance configs changed during the rebalance
             instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false);
-            tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+            tierToInstancePartitionsMap =
+                getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
             targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
                 tierToInstancePartitionsMap, rebalanceConfig);
           } catch (Exception e) {
@@ -362,7 +367,7 @@ public class TableRebalancer {
                 tableNameWithType, e);
             return new RebalanceResult(RebalanceResult.Status.FAILED,
                 "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap,
-                targetAssignment);
+                tierToInstancePartitionsMap, targetAssignment);
           }
         } else {
           LOGGER.info(
@@ -384,7 +389,7 @@ public class TableRebalancer {
         return new RebalanceResult(RebalanceResult.Status.DONE,
             "Success with minAvailableReplicas: " + minAvailableReplicas
                 + " (both IdealState and ExternalView should reach the target segment assignment)",
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
       }
 
       Map<String, Map<String, String>> nextAssignment =
@@ -412,7 +417,7 @@ public class TableRebalancer {
         LOGGER.warn("Caught exception while updating IdealState for table: {}, aborting the rebalance",
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e,
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
       }
     }
   }
@@ -524,31 +529,73 @@ public class TableRebalancer {
   }
 
   @Nullable
-  private Map<String, InstancePartitions> getTierToInstancePartitionsMap(String tableNameWithType,
-      @Nullable List<Tier> sortedTiers) {
+  private Map<String, InstancePartitions> getTierToInstancePartitionsMap(TableConfig tableConfig,
+      @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun) {
     if (sortedTiers == null) {
       return null;
     }
     Map<String, InstancePartitions> tierToInstancePartitionsMap = new HashMap<>();
     for (Tier tier : sortedTiers) {
       LOGGER.info("Fetching/computing instance partitions for tier: {} of table: {}", tier.getName(),
-          tableNameWithType);
-      tierToInstancePartitionsMap.put(tier.getName(), getInstancePartitionsForTier(tier, tableNameWithType));
+          tableConfig.getTableName());
+      tierToInstancePartitionsMap.put(tier.getName(),
+          getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun));
     }
     return tierToInstancePartitionsMap;
   }
 
   /**
-   * Creates a default instance assignment for the tier.
-   * TODO: We only support default server-tag based assignment currently.
-   *  In next iteration, we will add InstanceAssignmentConfig to the TierConfig and also support persisting of the
-   *  InstancePartitions to zk.
-   *  Then we'll be able to support replica group assignment while creating InstancePartitions for tiers
+   * Computes the instance partitions for the given tier. If table's instanceAssignmentConfigMap has an entry for the
+   * tier, it's used to calculate the instance partitions. Else default instance partitions are returned
    */
-  private InstancePartitions getInstancePartitionsForTier(Tier tier, String tableNameWithType) {
+  private InstancePartitions getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, boolean reassignInstances,
+      boolean bootstrap, boolean dryRun) {
     PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage();
-    return InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType,
-        tier.getName(), storage.getServerTag());
+    InstancePartitions defaultInstancePartitions =
+        InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableConfig.getTableName(),
+            tier.getName(), storage.getServerTag());
+
+    if (tableConfig.getInstanceAssignmentConfigMap() == null || !tableConfig.getInstanceAssignmentConfigMap()
+        .containsKey(tier.getName())) {
+      LOGGER.info("Skipping fetching/computing instance partitions for tier {} for table: {}", tier.getName(),
+          tableConfig.getTableName());
+      if (!dryRun) {
+        String instancePartitionsName =
+            InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+        LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName);
+        InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName);
+      }
+      return defaultInstancePartitions;
+    }
+
+    String tableNameWithType = tableConfig.getTableName();
+    String instancePartitionsName =
+        InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+    if (reassignInstances) {
+      // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition
+      // map can be fully recalculated.
+      InstancePartitions existingInstancePartitions = bootstrap ? null
+          : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+              instancePartitionsName);
+      InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
+      InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tier.getName(),
+          _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+          existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+      if (!dryRun) {
+        LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
+        InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
+      }
+      return instancePartitions;
+    }
+
+    InstancePartitions instancePartitions =
+        InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+            InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tier.getName()));
+    if (instancePartitions != null) {
+      return instancePartitions;
+    }
+
+    return defaultInstancePartitions;
   }
 
   private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index 38334bb25a..ef95da5135 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -21,15 +21,19 @@ package org.apache.pinot.controller.api;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -59,6 +63,8 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
 
+  private static final String TIER_NAME = "tier1";
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -114,13 +120,13 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
     offlineTableConfig.setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE, offlineInstanceAssignmentConfig));
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
 
     // OFFLINE instance partitions should be generated
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = getInstancePartitionsMap();
+    Map<String, InstancePartitions> instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 1);
-    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
@@ -132,72 +138,108 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
     realtimeTableConfig.setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig));
+        Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
 
     // CONSUMING instance partitions should be generated
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 2);
-    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
-    InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    InstancePartitions consumingInstancePartitions =
+        instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
     String consumingInstanceId = consumingInstancePartitions.getInstances(0, 0).get(0);
 
+    // Add tier config and tier instance assignment config to the offline table config
+    offlineTableConfig.setTierConfigsList(Collections.singletonList(
+        new TierConfig(TIER_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), null,
+            null)));
+    InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
+    instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
+    instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
+    offlineTableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap);
+    _helixResourceManager.setExistingTableConfig(offlineTableConfig);
+
+    // tier instance partitions should be generated
+    Map<String, InstancePartitions> tierInstancePartitionsMap = getInstancePartitionsMap();
+    assertEquals(tierInstancePartitionsMap.size(), 3);
+    InstancePartitions tierInstancePartitions = tierInstancePartitionsMap.get(TIER_NAME);
+    assertNotNull(tierInstancePartitions);
+    assertEquals(tierInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tierInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tierInstancePartitions.getInstances(0, 0).size(), 1);
+
     // Use OFFLINE instance assignment config as the COMPLETED instance assignment config
-    realtimeTableConfig.setInstanceAssignmentConfigMap(
-        new TreeMap<InstancePartitionsType, InstanceAssignmentConfig>() {{
-          put(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig);
-          put(InstancePartitionsType.COMPLETED, offlineInstanceAssignmentConfig);
-        }});
+    realtimeTableConfig.setInstanceAssignmentConfigMap(new TreeMap<String, InstanceAssignmentConfig>() {{
+      put(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig);
+      put(InstancePartitionsType.COMPLETED.toString(), offlineInstanceAssignmentConfig);
+    }});
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
 
     // COMPLETED instance partitions should be generated
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
-    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertEquals(instancePartitionsMap.size(), 4);
+    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
-    consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId));
-    InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    InstancePartitions completedInstancePartitions =
+        instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
     assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+    InstancePartitions tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+    assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
 
     // Test fetching instance partitions by table name with type suffix
     instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
         _controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME),
             null)));
-    assertEquals(instancePartitionsMap.size(), 1);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    assertEquals(instancePartitionsMap.size(), 2);
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
     instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
         _controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
             null)));
     assertEquals(instancePartitionsMap.size(), 2);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Test fetching instance partitions by table name and instance partitions type
     for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
-      instancePartitionsMap = deserializeInstancePartitionsMap(
-          sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, instancePartitionsType)));
+      instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
+          _controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, instancePartitionsType.toString())));
       assertEquals(instancePartitionsMap.size(), 1);
-      assertEquals(instancePartitionsMap.get(instancePartitionsType).getInstancePartitionsName(),
+      assertEquals(instancePartitionsMap.get(instancePartitionsType.toString()).getInstancePartitionsName(),
           instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
     }
 
+    // Test fetching instance partitions by table name and tier name
+    instancePartitionsMap = deserializeInstancePartitionsMap(
+        sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, TIER_NAME)));
+    assertEquals(instancePartitionsMap.size(), 1);
+    assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(),
+        InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME, TIER_NAME));
+
     // Remove the instance partitions for both offline and real-time table
     sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null));
     try {
@@ -210,21 +252,25 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Assign instances without instance partitions type (dry run)
     instancePartitionsMap = deserializeInstancePartitionsMap(
         sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, true), null));
-    assertEquals(instancePartitionsMap.size(), 3);
-    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertEquals(instancePartitionsMap.size(), 4);
+    offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
-    consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId));
-    completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
     assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+    tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+    assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
 
     // Instance partitions should not be persisted
     try {
@@ -239,34 +285,36 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
 
     // Instance partitions should be persisted
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
+    assertEquals(instancePartitionsMap.size(), 4);
 
     // Remove the instance partitions for real-time table
     sendDeleteRequest(
         _controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
             null));
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 1);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    assertEquals(instancePartitionsMap.size(), 2);
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
 
     // Assign instances for COMPLETED segments
     instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
         _controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, false), null));
     assertEquals(instancePartitionsMap.size(), 1);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // There should be OFFLINE and COMPLETED instance partitions persisted
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 2);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    assertEquals(instancePartitionsMap.size(), 3);
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Replace OFFLINE instance with CONSUMING instance for COMPLETED instance partitions
     instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
         _controllerRequestURLBuilder.forInstanceReplace(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED,
             offlineInstanceId, consumingInstanceId), null));
     assertEquals(instancePartitionsMap.size(), 1);
-    assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0),
+    assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0, 0),
         Collections.singletonList(consumingInstanceId));
 
     // Replace the instance again using real-time table name (old instance does not exist)
@@ -284,26 +332,27 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null),
             consumingInstancePartitions.toJsonString()));
     assertEquals(instancePartitionsMap.size(), 1);
-    assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0),
+    assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0, 0),
         Collections.singletonList(consumingInstanceId));
 
     // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and COMPLETED instance partitions should have
     // CONSUMING instance
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
-    assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0, 0),
+    assertEquals(instancePartitionsMap.size(), 4);
+    assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString()).getInstances(0, 0),
         Collections.singletonList(offlineInstanceId));
-    assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0),
+    assertEquals(instancePartitionsMap.get(TIER_NAME).getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+    assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0, 0),
         Collections.singletonList(consumingInstanceId));
-    assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0),
+    assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0, 0),
         Collections.singletonList(consumingInstanceId));
 
     // Delete the offline table
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 2);
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+    assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Delete the real-time table
     _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
@@ -315,18 +364,16 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     }
   }
 
-  private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap()
+  private Map<String, InstancePartitions> getInstancePartitionsMap()
       throws Exception {
     return deserializeInstancePartitionsMap(
         sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null)));
   }
 
-  private Map<InstancePartitionsType, InstancePartitions> deserializeInstancePartitionsMap(
-      String instancePartitionsMapString)
+  private Map<String, InstancePartitions> deserializeInstancePartitionsMap(String instancePartitionsMapString)
       throws Exception {
-    return JsonUtils.stringToObject(instancePartitionsMapString,
-        new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() {
-        });
+    return JsonUtils.stringToObject(instancePartitionsMapString, new TypeReference<Map<String, InstancePartitions>>() {
+    });
   }
 
   @AfterClass
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4715271fc5..53bd2da317 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -352,7 +352,7 @@ public class InstanceAssignmentTest {
     InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -405,7 +405,7 @@ public class InstanceAssignmentTest {
 
     // Select all 3 pools in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
@@ -427,7 +427,7 @@ public class InstanceAssignmentTest {
 
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -449,7 +449,7 @@ public class InstanceAssignmentTest {
     // Assign instances from 2 pools to 3 replica-groups
     numReplicaGroups = numPools;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -479,7 +479,7 @@ public class InstanceAssignmentTest {
     numPools = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
     // Reset the instance configs to have only two pools.
     instanceConfigs.clear();
@@ -528,7 +528,7 @@ public class InstanceAssignmentTest {
 
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -555,7 +555,7 @@ public class InstanceAssignmentTest {
     // Assign instances from 2 pools to 3 replica-groups
     numReplicaGroups = 3;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -634,7 +634,7 @@ public class InstanceAssignmentTest {
     // Reduce number of replica groups from 3 to 2.
     numReplicaGroups = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -761,7 +761,7 @@ public class InstanceAssignmentTest {
     InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null);
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // No instance with correct tag
@@ -791,7 +791,7 @@ public class InstanceAssignmentTest {
 
     // Enable pool
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // No instance has correct pool configured
@@ -825,7 +825,7 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for too many pools
@@ -837,7 +837,7 @@ public class InstanceAssignmentTest {
     }
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2));
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for pool that does not exist
@@ -850,7 +850,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for too many instances
@@ -863,7 +863,7 @@ public class InstanceAssignmentTest {
 
     // Enable replica-group
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Number of replica-groups must be positive
@@ -875,7 +875,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for too many replica-groups
@@ -888,7 +888,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for too many instances
@@ -900,7 +900,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Ask for too many instances per partition
@@ -913,7 +913,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
@@ -952,7 +952,7 @@ public class InstanceAssignmentTest {
 
     try {
       tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-          .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+          .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
               new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
@@ -979,7 +979,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                 InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1011,7 +1011,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                 InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1051,7 +1051,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                 InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1089,7 +1089,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
             numInstancesPerPartition, false);
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                 InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -1161,7 +1161,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
             numInstancesPerPartition, true);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                 InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1242,7 +1242,7 @@ public class InstanceAssignmentTest {
     SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(
         Collections.singletonMap(partitionColumnName, new ColumnPartitionConfig("Modulo", numPartitionsSegment, null)));
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-            Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
@@ -1316,7 +1316,7 @@ public class InstanceAssignmentTest {
         new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1372,7 +1372,7 @@ public class InstanceAssignmentTest {
     instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1439,7 +1439,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1505,7 +1505,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1576,7 +1576,7 @@ public class InstanceAssignmentTest {
     instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1627,7 +1627,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1691,7 +1691,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
                     InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 4c006967d1..4b9e869dcf 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -196,7 +196,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null);
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
-    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
@@ -403,6 +403,131 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
         assertTrue(instance.startsWith(expectedPrefix));
       }
     }
+    _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
+  }
+
+  @Test
+  public void testRebalanceWithTiersAndInstanceAssignments()
+      throws Exception {
+    int numServers = 3;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(
+          "replicaAssignment" + NO_TIER_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + i, false);
+    }
+    _helixResourceManager.createServerTenant(
+        new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME, numServers, numServers, 0));
+
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+            .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+    // Create the table
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add 6 nodes tierA
+    for (int i = 0; i < 6; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(
+          "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + i, false);
+    }
+    _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, "replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+    // rebalance is NOOP and no change in assignment caused by new instances
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add tier config
+    tableConfig.setTierConfigsList(Lists.newArrayList(
+        new TierConfig(TIER_A_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "0d", null,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" + TIER_A_NAME + "_OFFLINE", null, null)));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // rebalance should change assignment
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+    // check that segments have moved to tier a
+    Map<String, Map<String, String>> tierSegmentAssignment = rebalanceResult.getSegmentAssignment();
+    for (Map.Entry<String, Map<String, String>> entry : tierSegmentAssignment.entrySet()) {
+      Map<String, String> instanceStateMap = entry.getValue();
+      for (String instance : instanceStateMap.keySet()) {
+        assertTrue(instance.startsWith("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX));
+      }
+    }
+
+    // Test rebalance with tier instance assignment
+    InstanceTagPoolConfig tagPoolConfig =
+        new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME), false, 0,
+            null);
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
+
+    InstancePartitions instancePartitions = rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
+
+    // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+    // [i2, i3, i4, i5, i0, i1]
+    //  r0  r1  r2  r0  r1  r2
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 2,
+            "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 3,
+            "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 4,
+            "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 1));
+
+    // The assignment are based on replica-group 0 and mirrored to all the replica-groups, so server of index 0, 1, 5
+    // should have the same segments assigned, and server of index 2, 3, 4 should have the same segments assigned, each
+    // with 5 segments
+    Map<String, Map<String, String>> newSegmentAssignment = rebalanceResult.getSegmentAssignment();
+    int numSegmentsOnServer0 = 0;
+    for (int i = 0; i < numSegments; i++) {
+      String segmentName = SEGMENT_NAME_PREFIX + i;
+      Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName);
+      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+      if (instanceStateMap.containsKey("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0)) {
+        numSegmentsOnServer0++;
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 1),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 5),
+            ONLINE);
+      } else {
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 2),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 3),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 4),
+            ONLINE);
+      }
+    }
+    assertEquals(numSegmentsOnServer0, numSegments / 2);
 
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 046c0175e7..361999a881 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -595,7 +595,8 @@ public final class TableConfigUtils {
       return;
     }
     for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
-      Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
+      Preconditions.checkState(
+          !tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
           String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s",
               instancePartitionsType));
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 88a829d846..ee14ef2b4e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1724,10 +1724,10 @@ public class TableConfigUtilsTest {
     // Call validate with a table-config with instance partitions set but not instance assignment config
     TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
 
-    TableConfig invalidTableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-            .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
-            .setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, instanceAssignmentConfig))
+    TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
+        .setInstanceAssignmentConfigMap(
+            ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig))
             .build();
     try {
       // Call validate with instance partitions and config set for the same type
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bae7b7c798..91a54bc942 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -89,7 +89,7 @@ public class TableConfig extends BaseJsonConfig {
   private TableTaskConfig _taskConfig;
   private RoutingConfig _routingConfig;
   private QueryConfig _queryConfig;
-  private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
+  private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
 
   @JsonPropertyDescription(value = "Point to an existing instance partitions")
   private Map<InstancePartitionsType, String> _instancePartitionsMap;
@@ -128,7 +128,7 @@ public class TableConfig extends BaseJsonConfig {
       @JsonProperty(ROUTING_CONFIG_KEY) @Nullable RoutingConfig routingConfig,
       @JsonProperty(QUERY_CONFIG_KEY) @Nullable QueryConfig queryConfig,
       @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
-          Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap,
+          Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap,
       @JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig> fieldConfigList,
       @JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
       @JsonProperty(DEDUP_CONFIG_KEY) @Nullable DedupConfig dedupConfig,
@@ -267,12 +267,12 @@ public class TableConfig extends BaseJsonConfig {
 
   @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY)
   @Nullable
-  public Map<InstancePartitionsType, InstanceAssignmentConfig> getInstanceAssignmentConfigMap() {
+  public Map<String, InstanceAssignmentConfig> getInstanceAssignmentConfigMap() {
     return _instanceAssignmentConfigMap;
   }
 
   public void setInstanceAssignmentConfigMap(
-      Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
+      Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
     _instanceAssignmentConfigMap = instanceAssignmentConfigMap;
   }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index a687a3eafd..2a00acc28f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -401,7 +401,7 @@ public class ControllerRequestURLBuilder {
     return url;
   }
 
-  public String forInstancePartitions(String tableName, @Nullable InstancePartitionsType instancePartitionsType) {
+  public String forInstancePartitions(String tableName, @Nullable String instancePartitionsType) {
     String url = StringUtil.join("/", _baseUrl, "tables", tableName, "instancePartitions");
     if (instancePartitionsType != null) {
       url += "?type=" + instancePartitionsType;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 4efa7db452..4916668eed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -109,7 +109,7 @@ public class TableConfigBuilder {
   private TableTaskConfig _taskConfig;
   private RoutingConfig _routingConfig;
   private QueryConfig _queryConfig;
-  private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
+  private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
   private Map<InstancePartitionsType, String> _instancePartitionsMap;
   private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
   private List<FieldConfig> _fieldConfigList;
@@ -344,7 +344,7 @@ public class TableConfigBuilder {
   }
 
   public TableConfigBuilder setInstanceAssignmentConfigMap(
-      Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
+      Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
     _instanceAssignmentConfigMap = instanceAssignmentConfigMap;
     return this;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org