You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/13 17:53:46 UTC

(pinot) 01/02: Initial POC code

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9864022a2f0e9a7a8c5f2e3e6ab07ae89f658497
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Wed Jan 17 13:31:58 2024 -0800

    Initial POC code
---
 .../common/assignment/InstancePartitionsUtils.java |  11 +-
 .../pinot/common/utils/helix/HelixHelper.java      |  10 +-
 .../pinot/controller/LeadControllerManager.java    |   1 +
 .../PinotInstanceAssignmentRestletResource.java    |   4 +-
 .../api/resources/PinotTenantRestletResource.java  |   3 +-
 ...ineSegmentOnlineOfflineStateModelGenerator.java |  65 +++++++++
 .../helix/core/PinotHelixResourceManager.java      |  46 ++++--
 ...lixSegmentOnlineOfflineStateModelGenerator.java |   2 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  30 ++++
 .../assignment/segment/SegmentAssignmentUtils.java |   8 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  50 ++++---
 .../helix/core/rebalance/TableRebalancer.java      |  46 +++++-
 .../helix/core/util/HelixSetupUtils.java           |  19 +++
 .../PinotLLCRealtimeSegmentManagerTest.java        |   6 +-
 .../integration/tests/HelixZNodeSizeLimitTest.java |  19 ++-
 .../server/starter/helix/BaseServerStarter.java    |   9 +-
 ...flineSegmentOnlineOfflineStateModelFactory.java | 162 +++++++++++++++++++++
 .../SegmentOnlineOfflineStateModelFactory.java     |  38 +++++
 .../org/apache/pinot/tools/HybridQuickstart.java   |   5 +-
 .../tools/admin/command/MoveReplicaGroup.java      |  17 ++-
 .../tools/admin/command/QuickstartRunner.java      |   2 +-
 .../airlineStats_offline_table_config.json         |   4 +-
 22 files changed, 501 insertions(+), 56 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index 759d387af4..f8bbd08934 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -23,7 +23,9 @@ import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -165,12 +167,19 @@ public class InstancePartitionsUtils {
    * Persists the instance partitions to Helix property store.
    */
   public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
-      InstancePartitions instancePartitions) {
+      ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions) {
     String path = ZKMetadataProvider
         .constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName());
     if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) {
       throw new ZkException("Failed to persist instance partitions: " + instancePartitions);
     }
+
+    // Set the INSTANCE_PARTITIONS under the RESOURCE config (only modifying set path for now to ensure it works)
+    // This is just a test to see how to access and update the CONFIG/RESOURCES
+    String resourceName = "INSTANCE_PARTITION_" + instancePartitions.getInstancePartitionsName();
+    ResourceConfig resourceConfig = new ResourceConfig(resourceName);
+    resourceConfig.setPreferenceLists(instancePartitions.getPartitionToInstancesMap());
+    configAccessor.setResourceConfig(helixClusterName, resourceName, resourceConfig);
   }
 
   /**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 4160dd44ef..8f76ba801f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -185,9 +185,12 @@ public class HelixHelper {
             String partitionName = it.next();
             int numChars = partitionName.length();
             Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
-            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
-              numChars += entry.getKey().length();
-              numChars += entry.getValue().length();
+            if (stateMap != null) {
+              // The stateMap might be NULL for FULL-AUTO segments, so always do this NULL check
+              for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+                numChars += entry.getKey().length();
+                numChars += entry.getValue().length();
+              }
             }
             numChars *= is.getNumPartitions();
             if (_minNumCharsInISToTurnOnCompression > 0
@@ -200,6 +203,7 @@ public class HelixHelper {
       });
       return idealStateWrapper._idealState;
     } catch (Exception e) {
+      LOGGER.error("Caught exception while updating ideal state for resource: " + resourceName, e);
       throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
index 5c595bae2a..a1a4f7c270 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -220,6 +220,7 @@ public class LeadControllerManager {
       return;
     }
 
+    LOGGER.info("************** onResourceConfigChange() fired ***************");
     boolean leadControllerResourceEnabled;
     try {
       leadControllerResourceEnabled = LeadControllerUtils.isLeadControllerResourceEnabled(_helixManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 282431e04b..6fd157d620 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -301,7 +301,9 @@ public class PinotInstanceAssignmentRestletResource {
   private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-      InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), instancePartitions);
+      InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(),
+          _resourceManager.getHelixZkManager().getConfigAccessor(), _resourceManager.getHelixClusterName(),
+          instancePartitions);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
           Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 8166427a93..1fea68c75f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -359,7 +359,8 @@ public class PinotTenantRestletResource {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
       InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
-          instancePartitions);
+          _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(),
+          _pinotHelixResourceManager.getHelixClusterName(), instancePartitions);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
           Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..e4f93b3a52
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = "OfflineSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new StateModelDefinition.Builder(PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 35c942aba6..ae208715fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1579,16 +1579,29 @@ public class PinotHelixResourceManager {
     LOGGER.info("Adding table {}: Validate table configs", tableNameWithType);
     validateTableTenantConfig(tableConfig);
 
-    IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-            _enableBatchMessageMode);
     TableType tableType = tableConfig.getTableType();
+    Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
+        "Invalid table type: %s", tableType);
+
+    IdealState idealState;
+    if (tableType == TableType.REALTIME) {
+      idealState =
+           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+               _enableBatchMessageMode);
+    } else {
+      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+      idealState =
+          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+              _enableBatchMessageMode);
+    }
+   // IdealState idealState =
+   //     PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+   //         _enableBatchMessageMode);
+
     // Ensure that table is not created if schema is not present
     if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) {
       throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType);
     }
-    Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
-        "Invalid table type: %s", tableType);
 
     // Add table config
     LOGGER.info("Adding table {}: Creating table config in the property store", tableNameWithType);
@@ -1785,7 +1798,8 @@ public class PinotHelixResourceManager {
                 referenceInstancePartitionsName);
           }
         }
-        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+            _helixClusterName, instancePartitions);
       }
     }
 
@@ -1803,7 +1817,8 @@ public class PinotHelixResourceManager {
                 instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
                     tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
             LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+                _helixClusterName, instancePartitions);
           }
         }
       }
@@ -2243,7 +2258,8 @@ public class PinotHelixResourceManager {
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
-          if (currentAssignment.containsKey(segmentName)) {
+          Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
+          if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) {
             LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
                 tableNameWithType);
           } else {
@@ -2251,8 +2267,18 @@ public class PinotHelixResourceManager {
                 segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
                 tableNameWithType);
-            currentAssignment.put(segmentName,
-                SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+            TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+            if (tableType == TableType.REALTIME) {
+              // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
+              currentAssignment.put(segmentName,
+                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+            } else {
+              // TODO: Assess whether to pass in an empty instance list or to set the preferred list
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+            }
+            // currentAssignment.put(segmentName,
+            //     SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
           }
           return idealState;
         });
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
index 18b6ac0a75..3e52de0359 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
@@ -69,7 +69,7 @@ public class PinotHelixSegmentOnlineOfflineStateModelGenerator {
     builder.dynamicUpperBound(ONLINE_STATE, "R");
     // dynamic constraint, R means it should be derived based on the replication
     // factor.
-    builder.dynamicUpperBound(CONSUMING_STATE, "R");
+    // builder.dynamicUpperBound(CONSUMING_STATE, "R");
 
     StateModelDefinition statemodelDefinition = builder.build();
     return statemodelDefinition;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 23a115417f..63222df7e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.controller.helix.core;
 
 import java.util.List;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
@@ -41,6 +43,7 @@ public class PinotTableIdealStateBuilder {
 
   public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
+    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
     CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType);
     customModeIdealStateBuilder
         .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
@@ -51,6 +54,33 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
+  public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
+      boolean enableBatchMessageMode) {
+    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
+    // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
+    FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
+    idealStateBuilder
+        .setStateModel(
+            PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+        // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
+        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    // The below config guarantees if active number of replicas is no less than minimum active replica, there will
+    // not be partition movements happened.
+    // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix
+    // controller waits at most 5 minutes and then re-calculate the participant assignment.
+    // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added
+    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+    idealStateBuilder.setRebalanceDelay(300_000);
+    idealStateBuilder.enableDelayRebalance();
+    // Set instance group tag
+    IdealState idealState = idealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
   /**
    * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream,
    * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 52f736a555..46e4cc4eca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -272,6 +273,13 @@ public class SegmentAssignmentUtils {
     return instanceStateMap;
   }
 
+  public static List<String> getInstanceStateList(Collection<String> instances) {
+    List<String> instanceStateList = new ArrayList<>();
+    instanceStateList.addAll(instances);
+    Collections.sort(instanceStateList);
+    return instanceStateList;
+  }
+
   /**
    * Returns a map from instance name to number of segments to be moved to it.
    */
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 25e40084ab..003f985e3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -332,12 +332,13 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    Map<String, List<String>> instancesStateList = idealState.getRecord().getListFields();
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
               numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
-          instancePartitionsMap);
+      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName,
+          segmentAssignment, instancePartitionsMap);
     }
 
     setIdealState(realtimeTableName, idealState);
@@ -818,6 +819,7 @@ public class PinotLLCRealtimeSegmentManager {
     try {
       HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
         assert idealState != null;
+        // TODO: how to handle such state updates for FULL-AUTO mode? So far we don't enable FULL-AUTO for REALTIME
         Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
         String state = stateMap.get(instanceName);
         if (SegmentStateModel.CONSUMING.equals(state)) {
@@ -972,7 +974,8 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
-      updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
+          idealState.getRecord().getListFields(), committingSegmentName,
           isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -984,8 +987,10 @@ public class PinotLLCRealtimeSegmentManager {
 
   @VisibleForTesting
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
+      Map<String, List<String>> instanceStatesList,
       @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    // TODO: Need to figure out the best way to handle committed segments' state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
       Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
@@ -1023,8 +1028,15 @@ public class PinotLLCRealtimeSegmentManager {
       // Assign instances to the new segment and add instances as state CONSUMING
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+      // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
       instanceStatesMap.put(newSegmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
+      //       Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
+      //       this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
+      //       looks like it does)
+      // instanceStatesList.put(newSegmentName, Collections.emptyList()
+      //    /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -1116,6 +1128,7 @@ public class PinotLLCRealtimeSegmentManager {
         Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
 
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    Map<String, List<String>> instanceStatesList = idealState.getRecord().getListFields();
     long currentTimeMs = getCurrentTimeMs();
     StreamPartitionMsgOffsetFactory offsetFactory =
         StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
@@ -1181,14 +1194,14 @@ public class PinotLLCRealtimeSegmentManager {
                       (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
-                  segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+                  newSegmentName, segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
               LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
                       + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId,
                   latestSegmentName);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
-                  instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+                  null, segmentAssignment, instancePartitionsMap);
             }
           }
           // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
@@ -1212,8 +1225,8 @@ public class PinotLLCRealtimeSegmentManager {
                     partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
-                instancePartitionsMap, startOffset);
+                newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+                segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1231,8 +1244,8 @@ public class PinotLLCRealtimeSegmentManager {
                         partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
                         latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                    newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
-                    instancePartitionsMap, startOffset);
+                    newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+                    segmentAssignment, instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
                     latestSegmentName);
@@ -1269,8 +1282,8 @@ public class PinotLLCRealtimeSegmentManager {
                 partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
-          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
-              segmentAssignment, instancePartitionsMap);
+          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment,
+              latestSegmentName, segmentAssignment, instancePartitionsMap);
         } else {
           LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}",
               latestSegmentZKMetadata.getStatus(), latestSegmentName);
@@ -1285,8 +1298,8 @@ public class PinotLLCRealtimeSegmentManager {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
                 numPartitions, numReplicas);
-        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
-            instancePartitionsMap);
+        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName,
+            segmentAssignment, instancePartitionsMap);
       }
     }
 
@@ -1296,7 +1309,8 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig,
       SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
-      Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
+      Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList,
+      SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
@@ -1307,8 +1321,8 @@ public class PinotLLCRealtimeSegmentManager {
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
         latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
-        instancePartitionsMap);
+    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName,
+        segmentAssignment, instancePartitionsMap);
   }
 
   private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 5c3514fa7e..f97aa7b06a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -158,6 +159,16 @@ public class TableRebalancer {
     }
   }
 
+  private Map<String, List<String>> convertTargetAssignmentToListFields(
+      Map<String, Map<String, String>> targetAssignment, boolean setInstances) {
+    Map<String, List<String>> convertedListField = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : targetAssignment.entrySet()) {
+      List<String> value = setInstances ? new ArrayList<>(entry.getValue().keySet()) : Collections.emptyList();
+      convertedListField.put(entry.getKey(), value);
+    }
+    return convertedListField;
+  }
+
   private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
       @Nullable String rebalanceJobId) {
     long startTimeMs = System.currentTimeMillis();
@@ -299,8 +310,19 @@ public class TableRebalancer {
       LOGGER.info("For rebalanceId: {}, rebalancing table: {} with downtime", rebalanceJobId, tableNameWithType);
 
       // Reuse current IdealState to update the IdealState in cluster
+      // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+      //       be set up at all. Maybe we don't even need this TableRebalancer?
+      TableType tableType = tableConfig.getTableType();
       ZNRecord idealStateRecord = currentIdealState.getRecord();
-      idealStateRecord.setMapFields(targetAssignment);
+      if (tableType == TableType.REALTIME) {
+        // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+        idealStateRecord.setMapFields(targetAssignment);
+      } else {
+        // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+        Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+        idealStateRecord.setListFields(listFieldsConverted);
+      }
+      // idealStateRecord.setMapFields(targetAssignment);
       currentIdealState.setNumPartitions(targetAssignment.size());
       currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));
 
@@ -496,7 +518,18 @@ public class TableRebalancer {
           SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
 
       // Reuse current IdealState to update the IdealState in cluster
-      idealStateRecord.setMapFields(nextAssignment);
+      // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+      //       be set up at all. Maybe we don't even need this TableRebalancer?
+      TableType tableType = tableConfig.getTableType();
+      if (tableType == TableType.REALTIME) {
+        // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+        idealStateRecord.setMapFields(targetAssignment);
+      } else {
+        // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+        Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+        idealStateRecord.setListFields(listFieldsConverted);
+      }
+      // idealStateRecord.setMapFields(nextAssignment);
       idealState.setNumPartitions(nextAssignment.size());
       idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));
 
@@ -598,7 +631,7 @@ public class TableRebalancer {
           if (!dryRun && !instancePartitionsUnchanged) {
             LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
             InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                instancePartitions);
+                _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
           }
         } else {
           String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
@@ -614,7 +647,7 @@ public class TableRebalancer {
               LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions,
                   preConfiguredInstancePartitions);
               InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  instancePartitions);
+                  _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
             }
           } else {
             instancePartitions =
@@ -625,7 +658,7 @@ public class TableRebalancer {
               LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
                   referenceInstancePartitionsName);
               InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  instancePartitions);
+                  _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
             }
           }
         }
@@ -730,7 +763,8 @@ public class TableRebalancer {
         boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
         if (!dryRun && !instancePartitionsUnchanged) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
-          InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
+          InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+              _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
         }
         return Pair.of(instancePartitions, instancePartitionsUnchanged);
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 8d21d18b1f..61f6112365 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
@@ -45,6 +46,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -79,6 +81,8 @@ public class HelixSetupUtils {
             new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
         Map<String, String> configMap = new HashMap<>();
         configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true));
+        configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
+            Boolean.toString(true));
         configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE));
         configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
         configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false));
@@ -147,6 +151,21 @@ public class HelixSetupUtils {
       helixDataAccessor
           .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
+
+    String offlineSegmentStateModelName =
+        PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
+        offlineSegmentStateModelName);
+    if (offlineStateModelDefinition == null || isUpdateStateModel) {
+      if (stateModelDefinition == null) {
+        LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
+      } else {
+        LOGGER.info("Updating offline segment state model: {} to contain CONSUMING state",
+            offlineSegmentStateModelName);
+      }
+      helixDataAccessor.createStateModelDef(
+          PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    }
   }
 
   private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 60b83ba24a..a882a47ec7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1221,9 +1221,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null,
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+          _idealState.getRecord().getListFields(), committingSegmentName, null,
           segmentAssignment, instancePartitionsMap);
-      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName,
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+          _idealState.getRecord().getListFields(), null, newSegmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
index 37d3aa6ca3..9dec9df6b6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
@@ -19,7 +19,8 @@
 
 package org.apache.pinot.integration.tests;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import java.util.Map;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -83,15 +84,23 @@ public class HelixZNodeSizeLimitTest extends BaseClusterIntegrationTest {
     try {
       HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
         Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
+        Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
         for (int i = 0; i < 500_000; i++) {
-          currentAssignment.put("segment_" + i,
-              ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
-          currentAssignment.put("segment_" + i,
-              ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+         // currentAssignment.put("segment_" + i,
+         //     ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
+         // currentAssignment.put("segment_" + i,
+         //     ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+          currentAssignmentList.put("segment_" + i,
+              ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 10)));
+          currentAssignmentList.put("segment_" + i,
+              ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 9)));
         }
         return idealState;
       });
     } catch (Exception e) {
+      System.out.println("exception: " + e);
+      System.out.println("exception: " + e.getMessage());
+      System.out.println("exception: " + e.getCause());
       Assert.fail("Exception shouldn't be thrown even if the data size of the ideal state is larger than 1M");
     }
   }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 2b771707d2..6e0d157075 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,10 +579,15 @@ public abstract class BaseServerStarter implements ServiceStartable {
     Tracing.ThreadAccountantOps
         .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactory =
+    StateModelFactory<?> stateModelFactoryWithRealtime =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    StateModelFactory<?> stateModelFactory =
+        new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    _helixManager.getStateMachineEngine()
+        .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     _helixManager.getStateMachineEngine()
-        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
+        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..0a0608b44d
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class OfflineSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public OfflineSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "OfflineSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    return new OfflineSegmentOnlineOfflineStateModelFactory.OfflineSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+  public class OfflineSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - OfflineSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+          "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+      try {
+        _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+                tableNameWithType, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 42d1642c7b..76b6e779dc 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -44,6 +44,14 @@ import org.slf4j.LoggerFactory;
  * 1. Add a new segment
  * 2. Refresh an existed now serving segment.
  * 3. Delete an existed segment.
+ *
+ * TODO: Assess how to handle this state model for FULL-AUTO. Today we have states such as A -> B, B -> C and A -> C.
+ *       FULL-AUTO optimizes such transitions and only calls A -> C in such cases. Due to semantics we need to allow
+ *       REALTIME segments to directly move from OFFLINE -> ONLINE for completed segments which violates what FULL-AUTO
+ *       does. Due to this limitation we never see transitions from OFFLINE -> CONSUMING even though we need this
+ *       transition for all new CONSUMING segments.
+ *       To unblock the POC, for now we have moved the OFFLINE segment state model to a different
+ *       class: OfflineSegmentOnlineOfflineStateModelFactory
  */
 public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
   private final String _instanceId;
@@ -76,6 +84,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
       String realtimeTableName = message.getResourceName();
       String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+//        return;
+//      }
+
       try {
         _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
       } catch (Exception e) {
@@ -97,6 +114,26 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
       String realtimeTableName = message.getResourceName();
       String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
       TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
       Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
       tableDataManager.onConsumingToOnline(segmentName);
@@ -131,6 +168,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
         tableDataManager.releaseSegment(acquiredSegment);
       }
     }
+//    }
 
     @Transition(from = "CONSUMING", to = "OFFLINE")
     public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index e5bd332c2b..dc8285472a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -51,7 +51,8 @@ public class HybridQuickstart extends Quickstart {
   public Map<String, Object> getConfigOverrides() {
     Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
     overrides.put("pinot.server.grpc.enable", "true");
-    overrides.put("pinot.server.grpc.port", "8090");
+    // Commenting out the below to allow running more than 1 server
+    // overrides.put("pinot.server.grpc.port", "8090");
     return overrides;
   }
 
@@ -114,7 +115,7 @@ public class HybridQuickstart extends Quickstart {
     quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir));
     quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir));
     final QuickstartRunner runner =
-        new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 1, 1, quickstartRunnerDir,
+        new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 4, 1, quickstartRunnerDir,
             getConfigOverrides());
 
     startKafka();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
index 4ece1f5abf..b14cb9e240 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -45,6 +46,7 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -210,9 +212,22 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman
       @Override
       public IdealState apply(@Nullable IdealState input) {
         Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields();
+        Map<String, List<String>> existingListField = input.getRecord().getListFields();
 
+        TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName);
         for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) {
-          existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          if (tableType == TableType.REALTIME) {
+            // TODO: Update listField only once REALTIME uses FULL-AUTO
+            existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          } else {
+            String segmentName = segmentEntry.getKey();
+            Map<String, String> segmentMapping = segmentEntry.getValue();
+            List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet());
+            Collections.sort(listOfHosts);
+            // TODO: Assess if we want to add the preferred list of hosts or not
+            existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */);
+          }
         }
         return input;
       }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e1e80e8ae0..310b6dc296 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -157,7 +157,7 @@ public class QuickstartRunner {
           .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
           .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath())
           .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())
-          .setConfigOverrides(_configOverrides);
+          .setConfigOverrides(_configOverrides).setMultiStageServerPort(8040 + i).setMultiStageRunnerPort(8100 + i);
       if (!serverStarter.execute()) {
         throw new RuntimeException("Failed to start Server");
       }
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 856719e058..12d29fc83d 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -6,7 +6,7 @@
     "timeType": "DAYS",
     "segmentPushType": "APPEND",
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "replication": "1"
+    "replication": "3"
   },
   "tenants": {},
   "fieldConfigList": [
@@ -44,7 +44,7 @@
         "coldTier": {
           "encodingType": "RAW",
           "indexes": {
-            "text": {
+            "inverted": {
               "enabled": "true"
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org