You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/13 17:53:45 UTC

(pinot) branch full-auto-poc created (now 35e96d1eb5)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at 35e96d1eb5 Initial POC code for hybrid table

This branch includes the following new commits:

     new 9864022a2f Initial POC code
     new 35e96d1eb5 Initial POC code for hybrid table

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 02/02: Initial POC code for hybrid table

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 35e96d1eb5a5662a62ab43dac4ad7ea9716f8b42
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Mon Feb 12 23:36:47 2024 -0800

    Initial POC code for hybrid table
---
 .../controller/helix/SegmentStatusChecker.java     |   6 +-
 ...imeSegmentOnlineOfflineStateModelGenerator.java |  72 +++++
 .../helix/core/PinotHelixResourceManager.java      |  30 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  17 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  30 +-
 .../helix/core/util/HelixSetupUtils.java           |  21 +-
 .../realtime/RealtimeSegmentDataManager.java       |   7 +
 .../server/starter/helix/BaseServerStarter.java    |  13 +-
 ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++
 .../airlineStats_realtime_table_config.json        |   4 +-
 11 files changed, 488 insertions(+), 44 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..2b9431b0f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
       int nReplicas = 0;
       int nIdeal = 0;
       nSegments++;
+      Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName);
+      if (partitionMap == null) {
+        continue;
+      }
       // Skip segments not online in ideal state
-      for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+      for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) {
         if (serverAndState == null) {
           break;
         }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..ec640131cb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL =
+      "RealtimeSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String CONSUMING_STATE = "CONSUMING";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(CONSUMING_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(CONSUMING_STATE, ONLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, CONSUMING_STATE);
+//    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(CONSUMING_STATE, OFFLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ae208715fd..0a308fb514 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState;
-    if (tableType == TableType.REALTIME) {
-      idealState =
-           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-               _enableBatchMessageMode);
-    } else {
-      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
-      idealState =
-          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-              _enableBatchMessageMode);
-    }
+    IdealState idealState =
+        PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+            _enableBatchMessageMode);
+//    if (tableType == TableType.REALTIME) {
+//      idealState =
+//           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//               _enableBatchMessageMode);
+//    } else {
+//      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+//      idealState =
+//          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//              _enableBatchMessageMode);
+//    }
    // IdealState idealState =
    //     PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
    //         _enableBatchMessageMode);
@@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager {
             TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
             if (tableType == TableType.REALTIME) {
               // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
-              currentAssignment.put(segmentName,
-                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+//              currentAssignment.put(segmentName,
+//                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
             } else {
               // TODO: Assess whether to pass in an empty instance list or to set the preferred list
               currentAssignmentList.put(segmentName, Collections.emptyList()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 63222df7e3..f02ea86dc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
     LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
     // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
     // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
     FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
     idealStateBuilder
-        .setStateModel(
-            PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setStateModel(stateModel)
         .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
         // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
         .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..03d72600c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -25,6 +25,7 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder {
             // Note that there is no problem in case the partition group has reached its end of life.
             SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
                 .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName());
+            String endOffset = segmentZKMetadata.getEndOffset();
+            if (StringUtils.isEmpty(endOffset)) {
+              return;
+            }
             StreamPartitionMsgOffset completedSegmentEndOffset =
-                _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+                _streamPartitionMsgOffsetFactory.create(endOffset);
             if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
               // there are unconsumed messages available on the stream
               missingSegmentInfo._totalCount++;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 003f985e3f..80aed78c04 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -71,7 +71,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -521,9 +520,16 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions.checkState(
-        idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-        "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    // Check whether there is at least 1 replica in ONLINE state for full-auto mode.
+    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+      Preconditions.checkState(
+          idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+          "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName);
+    } else {
+      Preconditions.checkState(
+          idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    }
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -993,9 +999,11 @@ public class PinotLLCRealtimeSegmentManager {
     // TODO: Need to figure out the best way to handle committed segments' state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
-      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
-      instanceStatesMap.put(committingSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+//      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+      instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName);
     }
 
@@ -1029,14 +1037,14 @@ public class PinotLLCRealtimeSegmentManager {
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
       // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
-      instanceStatesMap.put(newSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
       // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
       //       Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
       //       this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
       //       looks like it does)
-      // instanceStatesList.put(newSegmentName, Collections.emptyList()
-      //    /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+       instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 61f6112365..a14010f17f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,23 +139,24 @@ public class HelixSetupUtils {
 
   private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
       HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
-    String segmentStateModelName =
-        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    String realtimeSegmentStateModelName =
+        PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName);
     if (stateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
-        LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
+        LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName);
       } else {
-        LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName);
       }
-      helixDataAccessor
-          .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+      helixDataAccessor.createStateModelDef(
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     String offlineSegmentStateModelName =
         PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
-        offlineSegmentStateModelName);
+    StateModelDefinition offlineStateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName);
     if (offlineStateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
         LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..5cdb3dda3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
     return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
   }
 
+  /**
+   * Returns the state of the realtime segment.
+   */
+  public State getState() {
+    return _state;
+  }
+
   /**
    * Returns the timestamp of the last consumed message.
    */
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6e0d157075..9862ffb7dd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable {
     Tracing.ThreadAccountantOps
         .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactoryWithRealtime =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+//    StateModelFactory<?> stateModelFactoryWithRealtime =
+//        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     StateModelFactory<?> stateModelFactory =
         new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    StateModelFactory<?> realtimeSegmentStateModelFactory =
+        new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     _helixManager.getStateMachineEngine()
         .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     _helixManager.getStateMachineEngine()
-        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
-            stateModelFactoryWithRealtime);
+        .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            realtimeSegmentStateModelFactory);
+//    _helixManager.getStateMachineEngine()
+//        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+//            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..58c785583a
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "RealtimeSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    return new RealtimeSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+  public class RealtimeSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "CONSUMING")
+    public void onBecomeConsumingFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+//        return;
+//      }
+
+      try {
+        _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "ONLINE")
+    public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToOnline(segmentName);
+      boolean isConsumingDone = false;
+      while (!isConsumingDone) {
+        SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName);
+        // For this transition to be correct in helix, we should already have a segment that is consuming
+        Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName,
+            realtimeTableName);
+
+        // TODO: https://github.com/apache/pinot/issues/10049
+        try {
+          // This indicates that the realtime segment is already a completed immutable segment.
+          // So nothing to do for this transition.
+          if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
+            // We found an LLC segment that is not consuming right now, must be that we already swapped it with a
+            // segment that has been built. Nothing to do for this state transition.
+            _logger.info(
+                "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition",
+                acquiredSegment.getSegmentName());
+            return;
+          }
+          RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment;
+          RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+          if (state.shouldConsume()) {
+            Thread.sleep(10_000L);
+          } else {
+            SegmentZKMetadata segmentZKMetadata =
+                ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName,
+                    segmentName);
+            segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+            isConsumingDone = true;
+          }
+
+//          // Use a single thread to wait for the consuming segment to be fully completed
+//          ExecutorService executorService = Executors.newSingleThreadExecutor();
+//          Future<Boolean> future = executorService.submit(() -> {
+//            try {
+//              while (true) {
+//                RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+//                if (state.shouldConsume()) {
+//                  Thread.sleep(5_000L);
+//                } else {
+//                  segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+//                  return true;
+//                }
+////              RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+////              if (state.isFinal()) {
+////                return true;
+////              }
+//              }
+//            } catch (InterruptedException e) {
+//              throw new RuntimeException(e);
+//            }
+//          });
+//          future.get();
+//          executorService.shutdown();
+        } catch (Exception e) {
+          String errorMessage =
+              String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+                  realtimeTableName, segmentName);
+          _logger.error(errorMessage, e);
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          Utils.rethrowException(e);
+        } finally {
+          tableDataManager.releaseSegment(acquiredSegment);
+        }
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "OFFLINE")
+    public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "DROPPED")
+    public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToDropped(segmentName);
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+
+//    @Transition(from = "OFFLINE", to = "ONLINE")
+//    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+//      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+//      String tableNameWithType = message.getResourceName();
+//      String segmentName = message.getPartitionName();
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+//      Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+//          "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+//      try {
+//        _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+//      } catch (Exception e) {
+//        String errorMessage =
+//            String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+//                tableNameWithType, segmentName);
+//        _logger.error(errorMessage, e);
+//        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+//        if (tableDataManager != null) {
+//          tableDataManager.addSegmentError(segmentName,
+//              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+//        }
+//        Utils.rethrowException(e);
+//      }
+//    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 9d1dcd7e14..018ee3b6e8 100644
--- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -6,7 +6,7 @@
     "timeColumnName": "DaysSinceEpoch",
     "retentionTimeUnit": "DAYS",
     "retentionTimeValue": "5",
-    "replication": "1"
+    "replication": "3"
   },
   "tableIndexConfig": {},
   "routing": {
@@ -26,7 +26,7 @@
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092",
           "realtime.segment.flush.threshold.time": "3600000",
-          "realtime.segment.flush.threshold.size": "50000"
+          "realtime.segment.flush.threshold.size": "500"
         }
       ]
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 01/02: Initial POC code

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9864022a2f0e9a7a8c5f2e3e6ab07ae89f658497
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Wed Jan 17 13:31:58 2024 -0800

    Initial POC code
---
 .../common/assignment/InstancePartitionsUtils.java |  11 +-
 .../pinot/common/utils/helix/HelixHelper.java      |  10 +-
 .../pinot/controller/LeadControllerManager.java    |   1 +
 .../PinotInstanceAssignmentRestletResource.java    |   4 +-
 .../api/resources/PinotTenantRestletResource.java  |   3 +-
 ...ineSegmentOnlineOfflineStateModelGenerator.java |  65 +++++++++
 .../helix/core/PinotHelixResourceManager.java      |  46 ++++--
 ...lixSegmentOnlineOfflineStateModelGenerator.java |   2 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  30 ++++
 .../assignment/segment/SegmentAssignmentUtils.java |   8 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  50 ++++---
 .../helix/core/rebalance/TableRebalancer.java      |  46 +++++-
 .../helix/core/util/HelixSetupUtils.java           |  19 +++
 .../PinotLLCRealtimeSegmentManagerTest.java        |   6 +-
 .../integration/tests/HelixZNodeSizeLimitTest.java |  19 ++-
 .../server/starter/helix/BaseServerStarter.java    |   9 +-
 ...flineSegmentOnlineOfflineStateModelFactory.java | 162 +++++++++++++++++++++
 .../SegmentOnlineOfflineStateModelFactory.java     |  38 +++++
 .../org/apache/pinot/tools/HybridQuickstart.java   |   5 +-
 .../tools/admin/command/MoveReplicaGroup.java      |  17 ++-
 .../tools/admin/command/QuickstartRunner.java      |   2 +-
 .../airlineStats_offline_table_config.json         |   4 +-
 22 files changed, 501 insertions(+), 56 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index 759d387af4..f8bbd08934 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -23,7 +23,9 @@ import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -165,12 +167,19 @@ public class InstancePartitionsUtils {
    * Persists the instance partitions to Helix property store.
    */
   public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
-      InstancePartitions instancePartitions) {
+      ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions) {
     String path = ZKMetadataProvider
         .constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName());
     if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) {
       throw new ZkException("Failed to persist instance partitions: " + instancePartitions);
     }
+
+    // Set the INSTANCE_PARTITIONS under the RESOURCE config (only modifying set path for now to ensure it works)
+    // This is just a test to see how to access and update the CONFIG/RESOURCES
+    String resourceName = "INSTANCE_PARTITION_" + instancePartitions.getInstancePartitionsName();
+    ResourceConfig resourceConfig = new ResourceConfig(resourceName);
+    resourceConfig.setPreferenceLists(instancePartitions.getPartitionToInstancesMap());
+    configAccessor.setResourceConfig(helixClusterName, resourceName, resourceConfig);
   }
 
   /**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 4160dd44ef..8f76ba801f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -185,9 +185,12 @@ public class HelixHelper {
             String partitionName = it.next();
             int numChars = partitionName.length();
             Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
-            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
-              numChars += entry.getKey().length();
-              numChars += entry.getValue().length();
+            if (stateMap != null) {
+              // The stateMap might be NULL for FULL-AUTO segments, so always do this NULL check
+              for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+                numChars += entry.getKey().length();
+                numChars += entry.getValue().length();
+              }
             }
             numChars *= is.getNumPartitions();
             if (_minNumCharsInISToTurnOnCompression > 0
@@ -200,6 +203,7 @@ public class HelixHelper {
       });
       return idealStateWrapper._idealState;
     } catch (Exception e) {
+      LOGGER.error("Caught exception while updating ideal state for resource: " + resourceName, e);
       throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
index 5c595bae2a..a1a4f7c270 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -220,6 +220,7 @@ public class LeadControllerManager {
       return;
     }
 
+    LOGGER.info("************** onResourceConfigChange() fired ***************");
     boolean leadControllerResourceEnabled;
     try {
       leadControllerResourceEnabled = LeadControllerUtils.isLeadControllerResourceEnabled(_helixManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 282431e04b..6fd157d620 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -301,7 +301,9 @@ public class PinotInstanceAssignmentRestletResource {
   private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-      InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), instancePartitions);
+      InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(),
+          _resourceManager.getHelixZkManager().getConfigAccessor(), _resourceManager.getHelixClusterName(),
+          instancePartitions);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
           Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 8166427a93..1fea68c75f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -359,7 +359,8 @@ public class PinotTenantRestletResource {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
       InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
-          instancePartitions);
+          _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(),
+          _pinotHelixResourceManager.getHelixClusterName(), instancePartitions);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
           Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..e4f93b3a52
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = "OfflineSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new StateModelDefinition.Builder(PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 35c942aba6..ae208715fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1579,16 +1579,29 @@ public class PinotHelixResourceManager {
     LOGGER.info("Adding table {}: Validate table configs", tableNameWithType);
     validateTableTenantConfig(tableConfig);
 
-    IdealState idealState =
-        PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-            _enableBatchMessageMode);
     TableType tableType = tableConfig.getTableType();
+    Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
+        "Invalid table type: %s", tableType);
+
+    IdealState idealState;
+    if (tableType == TableType.REALTIME) {
+      idealState =
+           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+               _enableBatchMessageMode);
+    } else {
+      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+      idealState =
+          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+              _enableBatchMessageMode);
+    }
+   // IdealState idealState =
+   //     PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+   //         _enableBatchMessageMode);
+
     // Ensure that table is not created if schema is not present
     if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) {
       throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType);
     }
-    Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
-        "Invalid table type: %s", tableType);
 
     // Add table config
     LOGGER.info("Adding table {}: Creating table config in the property store", tableNameWithType);
@@ -1785,7 +1798,8 @@ public class PinotHelixResourceManager {
                 referenceInstancePartitionsName);
           }
         }
-        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+        InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+            _helixClusterName, instancePartitions);
       }
     }
 
@@ -1803,7 +1817,8 @@ public class PinotHelixResourceManager {
                 instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
                     tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
             LOGGER.info("Persisting instance partitions: {}", instancePartitions);
-            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+                _helixClusterName, instancePartitions);
           }
         }
       }
@@ -2243,7 +2258,8 @@ public class PinotHelixResourceManager {
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
-          if (currentAssignment.containsKey(segmentName)) {
+          Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
+          if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) {
             LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
                 tableNameWithType);
           } else {
@@ -2251,8 +2267,18 @@ public class PinotHelixResourceManager {
                 segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
                 tableNameWithType);
-            currentAssignment.put(segmentName,
-                SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+            TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+            if (tableType == TableType.REALTIME) {
+              // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
+              currentAssignment.put(segmentName,
+                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+            } else {
+              // TODO: Assess whether to pass in an empty instance list or to set the preferred list
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+            }
+            // currentAssignment.put(segmentName,
+            //     SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
           }
           return idealState;
         });
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
index 18b6ac0a75..3e52de0359 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
@@ -69,7 +69,7 @@ public class PinotHelixSegmentOnlineOfflineStateModelGenerator {
     builder.dynamicUpperBound(ONLINE_STATE, "R");
     // dynamic constraint, R means it should be derived based on the replication
     // factor.
-    builder.dynamicUpperBound(CONSUMING_STATE, "R");
+    // builder.dynamicUpperBound(CONSUMING_STATE, "R");
 
     StateModelDefinition statemodelDefinition = builder.build();
     return statemodelDefinition;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 23a115417f..63222df7e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.controller.helix.core;
 
 import java.util.List;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
@@ -41,6 +43,7 @@ public class PinotTableIdealStateBuilder {
 
   public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
+    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
     CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType);
     customModeIdealStateBuilder
         .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
@@ -51,6 +54,33 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
+  public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
+      boolean enableBatchMessageMode) {
+    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
+    // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
+    FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
+    idealStateBuilder
+        .setStateModel(
+            PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+        // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
+        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    // The below config guarantees if active number of replicas is no less than minimum active replica, there will
+    // not be partition movements happened.
+    // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix
+    // controller waits at most 5 minutes and then re-calculate the participant assignment.
+    // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added
+    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+    idealStateBuilder.setRebalanceDelay(300_000);
+    idealStateBuilder.enableDelayRebalance();
+    // Set instance group tag
+    IdealState idealState = idealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
   /**
    * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream,
    * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 52f736a555..46e4cc4eca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -272,6 +273,13 @@ public class SegmentAssignmentUtils {
     return instanceStateMap;
   }
 
+  public static List<String> getInstanceStateList(Collection<String> instances) {
+    List<String> instanceStateList = new ArrayList<>();
+    instanceStateList.addAll(instances);
+    Collections.sort(instanceStateList);
+    return instanceStateList;
+  }
+
   /**
    * Returns a map from instance name to number of segments to be moved to it.
    */
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 25e40084ab..003f985e3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -332,12 +332,13 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    Map<String, List<String>> instancesStateList = idealState.getRecord().getListFields();
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
               numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
-          instancePartitionsMap);
+      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName,
+          segmentAssignment, instancePartitionsMap);
     }
 
     setIdealState(realtimeTableName, idealState);
@@ -818,6 +819,7 @@ public class PinotLLCRealtimeSegmentManager {
     try {
       HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
         assert idealState != null;
+        // TODO: how to handle such state updates for FULL-AUTO mode? So far we don't enable FULL-AUTO for REALTIME
         Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
         String state = stateMap.get(instanceName);
         if (SegmentStateModel.CONSUMING.equals(state)) {
@@ -972,7 +974,8 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
-      updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
+          idealState.getRecord().getListFields(), committingSegmentName,
           isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -984,8 +987,10 @@ public class PinotLLCRealtimeSegmentManager {
 
   @VisibleForTesting
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
+      Map<String, List<String>> instanceStatesList,
       @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    // TODO: Need to figure out the best way to handle committed segments' state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
       Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
@@ -1023,8 +1028,15 @@ public class PinotLLCRealtimeSegmentManager {
       // Assign instances to the new segment and add instances as state CONSUMING
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+      // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
       instanceStatesMap.put(newSegmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
+      //       Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
+      //       this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
+      //       looks like it does)
+      // instanceStatesList.put(newSegmentName, Collections.emptyList()
+      //    /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -1116,6 +1128,7 @@ public class PinotLLCRealtimeSegmentManager {
         Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
 
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    Map<String, List<String>> instanceStatesList = idealState.getRecord().getListFields();
     long currentTimeMs = getCurrentTimeMs();
     StreamPartitionMsgOffsetFactory offsetFactory =
         StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
@@ -1181,14 +1194,14 @@ public class PinotLLCRealtimeSegmentManager {
                       (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
-                  segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+                  newSegmentName, segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
               LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
                       + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId,
                   latestSegmentName);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
-                  instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+                  null, segmentAssignment, instancePartitionsMap);
             }
           }
           // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
@@ -1212,8 +1225,8 @@ public class PinotLLCRealtimeSegmentManager {
                     partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
-                instancePartitionsMap, startOffset);
+                newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+                segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1231,8 +1244,8 @@ public class PinotLLCRealtimeSegmentManager {
                         partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
                         latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                    newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
-                    instancePartitionsMap, startOffset);
+                    newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+                    segmentAssignment, instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
                     latestSegmentName);
@@ -1269,8 +1282,8 @@ public class PinotLLCRealtimeSegmentManager {
                 partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
-          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
-              segmentAssignment, instancePartitionsMap);
+          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment,
+              latestSegmentName, segmentAssignment, instancePartitionsMap);
         } else {
           LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}",
               latestSegmentZKMetadata.getStatus(), latestSegmentName);
@@ -1285,8 +1298,8 @@ public class PinotLLCRealtimeSegmentManager {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
                 numPartitions, numReplicas);
-        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
-            instancePartitionsMap);
+        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName,
+            segmentAssignment, instancePartitionsMap);
       }
     }
 
@@ -1296,7 +1309,8 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig,
       SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
-      Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
+      Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList,
+      SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
@@ -1307,8 +1321,8 @@ public class PinotLLCRealtimeSegmentManager {
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
         latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
-        instancePartitionsMap);
+    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName,
+        segmentAssignment, instancePartitionsMap);
   }
 
   private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 5c3514fa7e..f97aa7b06a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -158,6 +159,16 @@ public class TableRebalancer {
     }
   }
 
+  private Map<String, List<String>> convertTargetAssignmentToListFields(
+      Map<String, Map<String, String>> targetAssignment, boolean setInstances) {
+    Map<String, List<String>> convertedListField = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : targetAssignment.entrySet()) {
+      List<String> value = setInstances ? new ArrayList<>(entry.getValue().keySet()) : Collections.emptyList();
+      convertedListField.put(entry.getKey(), value);
+    }
+    return convertedListField;
+  }
+
   private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
       @Nullable String rebalanceJobId) {
     long startTimeMs = System.currentTimeMillis();
@@ -299,8 +310,19 @@ public class TableRebalancer {
       LOGGER.info("For rebalanceId: {}, rebalancing table: {} with downtime", rebalanceJobId, tableNameWithType);
 
       // Reuse current IdealState to update the IdealState in cluster
+      // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+      //       be set up at all. Maybe we don't even need this TableRebalancer?
+      TableType tableType = tableConfig.getTableType();
       ZNRecord idealStateRecord = currentIdealState.getRecord();
-      idealStateRecord.setMapFields(targetAssignment);
+      if (tableType == TableType.REALTIME) {
+        // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+        idealStateRecord.setMapFields(targetAssignment);
+      } else {
+        // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+        Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+        idealStateRecord.setListFields(listFieldsConverted);
+      }
+      // idealStateRecord.setMapFields(targetAssignment);
       currentIdealState.setNumPartitions(targetAssignment.size());
       currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));
 
@@ -496,7 +518,18 @@ public class TableRebalancer {
           SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
 
       // Reuse current IdealState to update the IdealState in cluster
-      idealStateRecord.setMapFields(nextAssignment);
+      // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+      //       be set up at all. Maybe we don't even need this TableRebalancer?
+      TableType tableType = tableConfig.getTableType();
+      if (tableType == TableType.REALTIME) {
+        // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+        idealStateRecord.setMapFields(targetAssignment);
+      } else {
+        // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+        Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+        idealStateRecord.setListFields(listFieldsConverted);
+      }
+      // idealStateRecord.setMapFields(nextAssignment);
       idealState.setNumPartitions(nextAssignment.size());
       idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));
 
@@ -598,7 +631,7 @@ public class TableRebalancer {
           if (!dryRun && !instancePartitionsUnchanged) {
             LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
             InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                instancePartitions);
+                _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
           }
         } else {
           String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
@@ -614,7 +647,7 @@ public class TableRebalancer {
               LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions,
                   preConfiguredInstancePartitions);
               InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  instancePartitions);
+                  _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
             }
           } else {
             instancePartitions =
@@ -625,7 +658,7 @@ public class TableRebalancer {
               LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
                   referenceInstancePartitionsName);
               InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
-                  instancePartitions);
+                  _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
             }
           }
         }
@@ -730,7 +763,8 @@ public class TableRebalancer {
         boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
         if (!dryRun && !instancePartitionsUnchanged) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
-          InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
+          InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+              _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
         }
         return Pair.of(instancePartitions, instancePartitionsUnchanged);
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 8d21d18b1f..61f6112365 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
@@ -45,6 +46,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -79,6 +81,8 @@ public class HelixSetupUtils {
             new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
         Map<String, String> configMap = new HashMap<>();
         configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true));
+        configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
+            Boolean.toString(true));
         configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE));
         configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
         configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false));
@@ -147,6 +151,21 @@ public class HelixSetupUtils {
       helixDataAccessor
           .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
+
+    String offlineSegmentStateModelName =
+        PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
+        offlineSegmentStateModelName);
+    if (offlineStateModelDefinition == null || isUpdateStateModel) {
+      if (stateModelDefinition == null) {
+        LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
+      } else {
+        LOGGER.info("Updating offline segment state model: {} to contain CONSUMING state",
+            offlineSegmentStateModelName);
+      }
+      helixDataAccessor.createStateModelDef(
+          PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    }
   }
 
   private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 60b83ba24a..a882a47ec7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1221,9 +1221,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null,
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+          _idealState.getRecord().getListFields(), committingSegmentName, null,
           segmentAssignment, instancePartitionsMap);
-      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName,
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+          _idealState.getRecord().getListFields(), null, newSegmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
index 37d3aa6ca3..9dec9df6b6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
@@ -19,7 +19,8 @@
 
 package org.apache.pinot.integration.tests;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import java.util.Map;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -83,15 +84,23 @@ public class HelixZNodeSizeLimitTest extends BaseClusterIntegrationTest {
     try {
       HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
         Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
+        Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
         for (int i = 0; i < 500_000; i++) {
-          currentAssignment.put("segment_" + i,
-              ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
-          currentAssignment.put("segment_" + i,
-              ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+         // currentAssignment.put("segment_" + i,
+         //     ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
+         // currentAssignment.put("segment_" + i,
+         //     ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+          currentAssignmentList.put("segment_" + i,
+              ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 10)));
+          currentAssignmentList.put("segment_" + i,
+              ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 9)));
         }
         return idealState;
       });
     } catch (Exception e) {
+      System.out.println("exception: " + e);
+      System.out.println("exception: " + e.getMessage());
+      System.out.println("exception: " + e.getCause());
       Assert.fail("Exception shouldn't be thrown even if the data size of the ideal state is larger than 1M");
     }
   }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 2b771707d2..6e0d157075 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,10 +579,15 @@ public abstract class BaseServerStarter implements ServiceStartable {
     Tracing.ThreadAccountantOps
         .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactory =
+    StateModelFactory<?> stateModelFactoryWithRealtime =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    StateModelFactory<?> stateModelFactory =
+        new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    _helixManager.getStateMachineEngine()
+        .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     _helixManager.getStateMachineEngine()
-        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
+        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..0a0608b44d
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class OfflineSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public OfflineSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "OfflineSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    return new OfflineSegmentOnlineOfflineStateModelFactory.OfflineSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+  public class OfflineSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - OfflineSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+          "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+      try {
+        _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+                tableNameWithType, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 42d1642c7b..76b6e779dc 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -44,6 +44,14 @@ import org.slf4j.LoggerFactory;
  * 1. Add a new segment
  * 2. Refresh an existed now serving segment.
  * 3. Delete an existed segment.
+ *
+ * TODO: Assess how to handle this state model for FULL-AUTO. Today we have states such as A -> B, B -> C and A -> C.
+ *       FULL-AUTO optimizes such transitions and only calls A -> C in such cases. Due to semantics we need to allow
+ *       REALTIME segments to directly move from OFFLINE -> ONLINE for completed segments which violates what FULL-AUTO
+ *       does. Due to this limitation we never see transitions from OFFLINE -> CONSUMING even though we need this
+ *       transition for all new CONSUMING segments.
+ *       To unblock the POC, for now we have moved the OFFLINE segment state model to a different
+ *       class: OfflineSegmentOnlineOfflineStateModelFactory
  */
 public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
   private final String _instanceId;
@@ -76,6 +84,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
       String realtimeTableName = message.getResourceName();
       String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+//        return;
+//      }
+
       try {
         _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
       } catch (Exception e) {
@@ -97,6 +114,26 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
       String realtimeTableName = message.getResourceName();
       String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
       TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
       Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
       tableDataManager.onConsumingToOnline(segmentName);
@@ -131,6 +168,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
         tableDataManager.releaseSegment(acquiredSegment);
       }
     }
+//    }
 
     @Transition(from = "CONSUMING", to = "OFFLINE")
     public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index e5bd332c2b..dc8285472a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -51,7 +51,8 @@ public class HybridQuickstart extends Quickstart {
   public Map<String, Object> getConfigOverrides() {
     Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
     overrides.put("pinot.server.grpc.enable", "true");
-    overrides.put("pinot.server.grpc.port", "8090");
+    // Commenting out the below to allow running more than 1 server
+    // overrides.put("pinot.server.grpc.port", "8090");
     return overrides;
   }
 
@@ -114,7 +115,7 @@ public class HybridQuickstart extends Quickstart {
     quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir));
     quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir));
     final QuickstartRunner runner =
-        new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 1, 1, quickstartRunnerDir,
+        new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 4, 1, quickstartRunnerDir,
             getConfigOverrides());
 
     startKafka();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
index 4ece1f5abf..b14cb9e240 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -45,6 +46,7 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -210,9 +212,22 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman
       @Override
       public IdealState apply(@Nullable IdealState input) {
         Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields();
+        Map<String, List<String>> existingListField = input.getRecord().getListFields();
 
+        TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName);
         for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) {
-          existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          if (tableType == TableType.REALTIME) {
+            // TODO: Update listField only once REALTIME uses FULL-AUTO
+            existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+          } else {
+            String segmentName = segmentEntry.getKey();
+            Map<String, String> segmentMapping = segmentEntry.getValue();
+            List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet());
+            Collections.sort(listOfHosts);
+            // TODO: Assess if we want to add the preferred list of hosts or not
+            existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */);
+          }
         }
         return input;
       }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e1e80e8ae0..310b6dc296 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -157,7 +157,7 @@ public class QuickstartRunner {
           .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
           .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath())
           .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())
-          .setConfigOverrides(_configOverrides);
+          .setConfigOverrides(_configOverrides).setMultiStageServerPort(8040 + i).setMultiStageRunnerPort(8100 + i);
       if (!serverStarter.execute()) {
         throw new RuntimeException("Failed to start Server");
       }
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 856719e058..12d29fc83d 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -6,7 +6,7 @@
     "timeType": "DAYS",
     "segmentPushType": "APPEND",
     "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
-    "replication": "1"
+    "replication": "3"
   },
   "tenants": {},
   "fieldConfigList": [
@@ -44,7 +44,7 @@
         "coldTier": {
           "encodingType": "RAW",
           "indexes": {
-            "text": {
+            "inverted": {
               "enabled": "true"
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org