You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/15 19:46:09 UTC

(pinot) branch full-auto-poc updated (3c1ecd9495 -> 37dc958fbc)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git


 discard 3c1ecd9495 Initial POC code for hybrid table
     new 37dc958fbc Initial POC code for hybrid table

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3c1ecd9495)
            \
             N -- N -- N   refs/heads/full-auto-poc (37dc958fbc)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/realtime/PinotLLCRealtimeSegmentManager.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 01/01: Initial POC code for hybrid table

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 37dc958fbc8cc33ab500167e43e78d764f239297
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Mon Feb 12 23:36:47 2024 -0800

    Initial POC code for hybrid table
---
 .../pinot/broker/routing/BrokerRoutingManager.java |  11 +-
 .../instanceselector/BaseInstanceSelector.java     |  59 ++--
 .../StrictReplicaGroupInstanceSelector.java        |  34 ++-
 .../controller/helix/SegmentStatusChecker.java     |   6 +-
 ...imeSegmentOnlineOfflineStateModelGenerator.java |  72 +++++
 .../helix/core/PinotHelixResourceManager.java      |  30 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  17 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  51 +++-
 .../helix/core/util/HelixSetupUtils.java           |  23 +-
 .../realtime/RealtimeSegmentDataManager.java       |   7 +
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   8 +-
 .../server/starter/helix/BaseServerStarter.java    |  13 +-
 ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++
 .../airlineStats_realtime_table_config.json        |   4 +-
 15 files changed, 574 insertions(+), 93 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index cc3a5354ef..b6be06653b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -431,7 +431,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
       externalViewVersion = externalView.getRecord().getVersion();
     }
 
-    Set<String> onlineSegments = getOnlineSegments(idealState);
+    Set<String> onlineSegments = getOnlineSegments(idealState, externalView);
 
     SegmentPreSelector segmentPreSelector =
         SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, _propertyStore);
@@ -480,7 +480,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
-        Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableIdealState);
+        Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableIdealState, offlineTableExternalView);
         SegmentPreSelector offlineTableSegmentPreSelector =
             SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, _propertyStore);
         Set<String> offlineTablePreSelectedOnlineSegments =
@@ -538,8 +538,11 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
   /**
    * Returns the online segments (with ONLINE/CONSUMING instances) in the given ideal state.
    */
-  private static Set<String> getOnlineSegments(IdealState idealState) {
+  private static Set<String> getOnlineSegments(IdealState idealState, ExternalView externalView) {
     Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
+    if (segmentAssignment.isEmpty()) {
+      segmentAssignment = externalView.getRecord().getMapFields();
+    }
     Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
@@ -777,7 +780,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
     // inconsistency between components, which is fine because the inconsistency only exists for the newly changed
     // segments and only lasts for a very short time.
     void onAssignmentChange(IdealState idealState, ExternalView externalView) {
-      Set<String> onlineSegments = getOnlineSegments(idealState);
+      Set<String> onlineSegments = getOnlineSegments(idealState, externalView);
       Set<String> preSelectedOnlineSegments = _segmentPreSelector.preSelect(onlineSegments);
       _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments);
       _segmentSelector.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index b2961eef94..aebafa7741 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -130,11 +131,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState, ExternalView externalView,
       Set<String> onlineSegments) {
     List<String> potentialNewSegments = new ArrayList<>();
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
-      assert idealStateAssignment.containsKey(segment);
-      if (isPotentialNewSegment(idealStateAssignment.get(segment), externalViewAssignment.get(segment))) {
+      assert externalViewAssignment.containsKey(segment);
+      if (isPotentialNewSegment(externalViewAssignment.get(segment))) {
         potentialNewSegments.add(segment);
       }
     }
@@ -169,14 +170,13 @@ abstract class BaseInstanceSelector implements InstanceSelector {
    * - Any instance for the segment is in ERROR state
    * - External view for the segment converges with ideal state
    */
-  static boolean isPotentialNewSegment(Map<String, String> idealStateInstanceStateMap,
-      @Nullable Map<String, String> externalViewInstanceStateMap) {
+  static boolean isPotentialNewSegment(@Nullable Map<String, String> externalViewInstanceStateMap) {
     if (externalViewInstanceStateMap == null) {
       return true;
     }
     boolean hasConverged = true;
     // Only track ONLINE/CONSUMING instances within the ideal state
-    for (Map.Entry<String, String> entry : idealStateInstanceStateMap.entrySet()) {
+    for (Map.Entry<String, String> entry : externalViewInstanceStateMap.entrySet()) {
       if (isOnlineForRouting(entry.getValue())) {
         String externalViewState = externalViewInstanceStateMap.get(entry.getKey());
         if (externalViewState == null || externalViewState.equals(SegmentStateModel.OFFLINE)) {
@@ -192,14 +192,13 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   /**
    * Returns the online instances for routing purpose.
    */
-  static TreeSet<String> getOnlineInstances(Map<String, String> idealStateInstanceStateMap,
-      Map<String, String> externalViewInstanceStateMap) {
+  static TreeSet<String> getOnlineInstances(Map<String, String> externalViewInstanceStateMap) {
     TreeSet<String> onlineInstances = new TreeSet<>();
     // Only track ONLINE/CONSUMING instances within the ideal state
-    for (Map.Entry<String, String> entry : idealStateInstanceStateMap.entrySet()) {
+    for (Map.Entry<String, String> entry : externalViewInstanceStateMap.entrySet()) {
       String instance = entry.getKey();
       // NOTE: DO NOT check if EV matches IS because it is a valid state when EV is CONSUMING while IS is ONLINE
-      if (isOnlineForRouting(entry.getValue()) && isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
+      if (isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
         onlineInstances.add(instance);
       }
     }
@@ -217,6 +216,14 @@ abstract class BaseInstanceSelector implements InstanceSelector {
     }
   }
 
+  static SortedSet<String> convertToSortedSet(Set<String> set) {
+    if (set instanceof SortedSet) {
+      return (SortedSet<String>) set;
+    } else {
+      return new TreeSet<>(set);
+    }
+  }
+
   /**
    * Updates the segment maps based on the given ideal state, external view, online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}) and new segments.
@@ -229,20 +236,22 @@ abstract class BaseInstanceSelector implements InstanceSelector {
     _oldSegmentCandidatesMap.clear();
     _newSegmentStateMap = new HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()));
 
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
+    Set<String> idealStateSegmentSet = idealState.getPartitionSet();
     Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+
       Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
       Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
       if (externalViewInstanceStateMap == null) {
         if (newSegmentCreationTimeMs != null) {
           // New segment
-          List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
-            if (isOnlineForRouting(entry.getValue())) {
-              candidates.add(new SegmentInstanceCandidate(entry.getKey(), false));
-            }
+          List<SegmentInstanceCandidate> candidates = new ArrayList<>(Integer.parseInt(idealState.getReplicas()));
+          for (String segmentName : convertToSortedSet(idealStateSegmentSet)) {
+//            if (isOnlineForRouting(entry.getValue())) {
+            candidates.add(new SegmentInstanceCandidate(segmentName, false));
+//            }
           }
           _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates));
         } else {
@@ -250,11 +259,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
           _oldSegmentCandidatesMap.put(segment, Collections.emptyList());
         }
       } else {
-        TreeSet<String> onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
+        TreeSet<String> onlineInstances = getOnlineInstances(externalViewInstanceStateMap);
         if (newSegmentCreationTimeMs != null) {
           // New segment
-          List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+          List<SegmentInstanceCandidate> candidates = new ArrayList<>(externalViewInstanceStateMap.size());
+          for (Map.Entry<String, String> entry : convertToSortedMap(externalViewInstanceStateMap).entrySet()) {
             if (isOnlineForRouting(entry.getValue())) {
               String instance = entry.getKey();
               candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance)));
@@ -365,7 +374,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
     Map<String, Long> newSegmentCreationTimeMap =
-        getNewSegmentCreationTimeMapFromExistingStates(idealState, externalView, onlineSegments);
+        getNewSegmentCreationTimeMapFromExistingStates(externalView, onlineSegments);
     updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentCreationTimeMap);
     refreshSegmentStates();
   }
@@ -373,11 +382,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   /**
    * Returns a map from new segment to their creation time based on the existing in-memory states.
    */
-  Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState idealState, ExternalView externalView,
+  Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(ExternalView externalView,
       Set<String> onlineSegments) {
     Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
     long currentTimeMs = _clock.millis();
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
       NewSegmentState newSegmentState = _newSegmentStateMap.get(segment);
@@ -393,8 +402,8 @@ abstract class BaseInstanceSelector implements InstanceSelector {
       }
       // For recently created segment, check if it is qualified as new segment
       if (creationTimeMs > 0) {
-        assert idealStateAssignment.containsKey(segment);
-        if (isPotentialNewSegment(idealStateAssignment.get(segment), externalViewAssignment.get(segment))) {
+        assert externalViewAssignment.containsKey(segment);
+        if (isPotentialNewSegment(externalViewAssignment.get(segment))) {
           newSegmentCreationTimeMap.put(segment, creationTimeMs);
         }
       }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8c352bdbe6..5b34371a8d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -96,7 +96,7 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
     int newSegmentMapCapacity = HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
     _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
 
-    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
 
     // Get the online instances for the segments
@@ -104,14 +104,14 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
         new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
     Map<String, Set<String>> newSegmentToOnlineInstancesMap = new HashMap<>(newSegmentMapCapacity);
     for (String segment : onlineSegments) {
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
-      assert idealStateInstanceStateMap != null;
+//      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+//      assert idealStateInstanceStateMap != null;
       Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
       Set<String> onlineInstances;
       if (externalViewInstanceStateMap == null) {
         onlineInstances = Collections.emptySet();
       } else {
-        onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
+        onlineInstances = getOnlineInstances(externalViewInstanceStateMap);
       }
       if (newSegmentCreationTimeMap.containsKey(segment)) {
         newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
@@ -126,16 +126,18 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
     for (Map.Entry<String, Set<String>> entry : oldSegmentToOnlineInstancesMap.entrySet()) {
       String segment = entry.getKey();
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
-      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
+//      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+//      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
+      Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
+      Set<String> instanceInExternalView = externalViewInstanceStateMap.keySet();
       Set<String> unavailableInstances =
-          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> new HashSet<>());
-      for (String instance : instancesInIdealState) {
+          unavailableInstancesMap.computeIfAbsent(instanceInExternalView, k -> new HashSet<>());
+      for (String instance : instanceInExternalView) {
         if (!onlineInstances.contains(instance)) {
           if (unavailableInstances.add(instance)) {
             LOGGER.warn(
                 "Found unavailable instance: {} in instance group: {} for segment: {}, table: {} (IS: {}, EV: {})",
-                instance, instancesInIdealState, segment, _tableNameWithType, idealStateInstanceStateMap,
+                instance, instanceInExternalView, segment, _tableNameWithType, externalViewInstanceStateMap,
                 externalViewAssignment.get(segment));
           }
         }
@@ -147,8 +149,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
       String segment = entry.getKey();
       // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
-      Set<String> unavailableInstances = unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
+      Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+      Set<String> unavailableInstances = unavailableInstancesMap.get(externalViewInstanceStateMap.keySet());
       List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size());
       for (String instance : onlineInstances) {
         if (!unavailableInstances.contains(instance)) {
@@ -161,11 +164,12 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
     for (Map.Entry<String, Set<String>> entry : newSegmentToOnlineInstancesMap.entrySet()) {
       String segment = entry.getKey();
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
+      Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
       Set<String> unavailableInstances =
-          unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), Collections.emptySet());
-      List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
-      for (String instance : convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+          unavailableInstancesMap.getOrDefault(externalViewInstanceStateMap.keySet(), Collections.emptySet());
+      List<SegmentInstanceCandidate> candidates = new ArrayList<>(externalViewInstanceStateMap.size());
+      for (String instance : convertToSortedMap(externalViewInstanceStateMap).keySet()) {
         if (!unavailableInstances.contains(instance)) {
           candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance)));
         }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..2b9431b0f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
       int nReplicas = 0;
       int nIdeal = 0;
       nSegments++;
+      Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName);
+      if (partitionMap == null) {
+        continue;
+      }
       // Skip segments not online in ideal state
-      for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+      for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) {
         if (serverAndState == null) {
           break;
         }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..ec640131cb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL =
+      "RealtimeSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String CONSUMING_STATE = "CONSUMING";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(CONSUMING_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(CONSUMING_STATE, ONLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, CONSUMING_STATE);
+//    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(CONSUMING_STATE, OFFLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ae208715fd..0a308fb514 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState;
-    if (tableType == TableType.REALTIME) {
-      idealState =
-           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-               _enableBatchMessageMode);
-    } else {
-      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
-      idealState =
-          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-              _enableBatchMessageMode);
-    }
+    IdealState idealState =
+        PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+            _enableBatchMessageMode);
+//    if (tableType == TableType.REALTIME) {
+//      idealState =
+//           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//               _enableBatchMessageMode);
+//    } else {
+//      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+//      idealState =
+//          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//              _enableBatchMessageMode);
+//    }
    // IdealState idealState =
    //     PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
    //         _enableBatchMessageMode);
@@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager {
             TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
             if (tableType == TableType.REALTIME) {
               // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
-              currentAssignment.put(segmentName,
-                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+//              currentAssignment.put(segmentName,
+//                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
             } else {
               // TODO: Assess whether to pass in an empty instance list or to set the preferred list
               currentAssignmentList.put(segmentName, Collections.emptyList()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 63222df7e3..f02ea86dc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
     LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
     // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
     // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
     FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
     idealStateBuilder
-        .setStateModel(
-            PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setStateModel(stateModel)
         .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
         // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
         .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..03d72600c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -25,6 +25,7 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder {
             // Note that there is no problem in case the partition group has reached its end of life.
             SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
                 .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName());
+            String endOffset = segmentZKMetadata.getEndOffset();
+            if (StringUtils.isEmpty(endOffset)) {
+              return;
+            }
             StreamPartitionMsgOffset completedSegmentEndOffset =
-                _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+                _streamPartitionMsgOffsetFactory.create(endOffset);
             if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
               // there are unconsumed messages available on the stream
               missingSegmentInfo._totalCount++;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 003f985e3f..88f3c1431c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -46,6 +46,7 @@ import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -71,7 +72,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -448,6 +448,18 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  @VisibleForTesting
+  ExternalView getExternalView(String realtimeTableName) {
+    try {
+      ExternalView externalView = HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, realtimeTableName);
+      Preconditions.checkState(externalView != null, "Failed to find ExternalView for table: " + realtimeTableName);
+      return externalView;
+    } catch (Exception e) {
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+      throw e;
+    }
+  }
+
   /**
    * This method moves the segment file from another location to its permanent location.
    * When splitCommit is enabled, segment file is uploaded to the segmentLocation in the committingSegmentDescriptor,
@@ -521,9 +533,17 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions.checkState(
-        idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-        "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    ExternalView externalView = getExternalView(realtimeTableName);
+    // Check whether there is at least 1 replica in ONLINE state for full-auto mode.
+    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+      Preconditions.checkState(
+          externalView.getStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+          "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName);
+    } else {
+      Preconditions.checkState(
+          idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    }
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -993,9 +1013,11 @@ public class PinotLLCRealtimeSegmentManager {
     // TODO: Need to figure out the best way to handle committed segments' state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
-      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
-      instanceStatesMap.put(committingSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+//      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+      instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName);
     }
 
@@ -1029,14 +1051,14 @@ public class PinotLLCRealtimeSegmentManager {
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
       // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
-      instanceStatesMap.put(newSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
       // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
       //       Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
       //       this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
       //       looks like it does)
-      // instanceStatesList.put(newSegmentName, Collections.emptyList()
-      //    /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+       instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -1247,8 +1269,11 @@ public class PinotLLCRealtimeSegmentManager {
                     newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
                     segmentAssignment, instancePartitionsMap, startOffset);
               } else {
-                LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
-                    latestSegmentName);
+                LOGGER.error(
+                    "Got unexpected instance state map: {} for segment: {}. Segment status: {}. "
+                        + "recreateDeletedConsumingSegment: {}",
+                    instanceStateMap, latestSegmentName, latestSegmentZKMetadata.getStatus(),
+                    recreateDeletedConsumingSegment);
               }
             }
             // else, the partition group has reached end of life. This is an acceptable state
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 61f6112365..033bf59405 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +82,7 @@ public class HelixSetupUtils {
         Map<String, String> configMap = new HashMap<>();
         configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true));
         configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
-            Boolean.toString(true));
+            Boolean.toString(false));
         configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE));
         configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
         configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false));
@@ -139,23 +139,24 @@ public class HelixSetupUtils {
 
   private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
       HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
-    String segmentStateModelName =
-        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    String realtimeSegmentStateModelName =
+        PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName);
     if (stateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
-        LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
+        LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName);
       } else {
-        LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName);
       }
-      helixDataAccessor
-          .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+      helixDataAccessor.createStateModelDef(
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     String offlineSegmentStateModelName =
         PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
-        offlineSegmentStateModelName);
+    StateModelDefinition offlineStateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName);
     if (offlineStateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
         LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..5cdb3dda3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
     return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
   }
 
+  /**
+   * Returns the state of the realtime segment.
+   */
+  public State getState() {
+    return _state;
+  }
+
   /**
    * Returns the timestamp of the last consumed message.
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 2389fe8ba6..50693aaf73 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -500,8 +500,12 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr
           }
         }
       });
-      Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0);
-      return seqNum.get();
+      if (seqNum.get() == -1) {
+        return 0;
+      } else {
+        Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0);
+        return seqNum.get();
+      }
     }
 
     public class ExceptingKafkaConsumer extends KafkaPartitionLevelConsumer {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6e0d157075..9862ffb7dd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable {
     Tracing.ThreadAccountantOps
         .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactoryWithRealtime =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+//    StateModelFactory<?> stateModelFactoryWithRealtime =
+//        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     StateModelFactory<?> stateModelFactory =
         new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    StateModelFactory<?> realtimeSegmentStateModelFactory =
+        new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     _helixManager.getStateMachineEngine()
         .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     _helixManager.getStateMachineEngine()
-        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
-            stateModelFactoryWithRealtime);
+        .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            realtimeSegmentStateModelFactory);
+//    _helixManager.getStateMachineEngine()
+//        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+//            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..d3a937eaef
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "RealtimeSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    return new RealtimeSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+  public class RealtimeSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "CONSUMING")
+    public void onBecomeConsumingFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+//        return;
+//      }
+
+      try {
+        _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "ONLINE")
+    public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToOnline(segmentName);
+      boolean isConsumingDone = false;
+      while (!isConsumingDone) {
+        SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName);
+        // For this transition to be correct in helix, we should already have a segment that is consuming
+        Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName,
+            realtimeTableName);
+
+        // TODO: https://github.com/apache/pinot/issues/10049
+        try {
+          // This indicates that the realtime segment is already a completed immutable segment.
+          // So nothing to do for this transition.
+          if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
+            // We found an LLC segment that is not consuming right now, must be that we already swapped it with a
+            // segment that has been built. Nothing to do for this state transition.
+            _logger.info(
+                "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition",
+                acquiredSegment.getSegmentName());
+            return;
+          }
+          RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment;
+          RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+          if ((!state.isFinal()) || state.shouldConsume()) {
+            Thread.sleep(10_000L);
+          } else {
+            SegmentZKMetadata segmentZKMetadata =
+                ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName,
+                    segmentName);
+            segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+            isConsumingDone = true;
+          }
+
+//          // Use a single thread to wait for the consuming segment to be fully completed
+//          ExecutorService executorService = Executors.newSingleThreadExecutor();
+//          Future<Boolean> future = executorService.submit(() -> {
+//            try {
+//              while (true) {
+//                RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+//                if (state.shouldConsume()) {
+//                  Thread.sleep(5_000L);
+//                } else {
+//                  segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+//                  return true;
+//                }
+////              RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+////              if (state.isFinal()) {
+////                return true;
+////              }
+//              }
+//            } catch (InterruptedException e) {
+//              throw new RuntimeException(e);
+//            }
+//          });
+//          future.get();
+//          executorService.shutdown();
+        } catch (Exception e) {
+          String errorMessage =
+              String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+                  realtimeTableName, segmentName);
+          _logger.error(errorMessage, e);
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          Utils.rethrowException(e);
+        } finally {
+          tableDataManager.releaseSegment(acquiredSegment);
+        }
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "OFFLINE")
+    public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "DROPPED")
+    public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToDropped(segmentName);
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+
+//    @Transition(from = "OFFLINE", to = "ONLINE")
+//    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+//      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+//      String tableNameWithType = message.getResourceName();
+//      String segmentName = message.getPartitionName();
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+//      Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+//          "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+//      try {
+//        _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+//      } catch (Exception e) {
+//        String errorMessage =
+//            String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+//                tableNameWithType, segmentName);
+//        _logger.error(errorMessage, e);
+//        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+//        if (tableDataManager != null) {
+//          tableDataManager.addSegmentError(segmentName,
+//              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+//        }
+//        Utils.rethrowException(e);
+//      }
+//    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 9d1dcd7e14..018ee3b6e8 100644
--- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -6,7 +6,7 @@
     "timeColumnName": "DaysSinceEpoch",
     "retentionTimeUnit": "DAYS",
     "retentionTimeValue": "5",
-    "replication": "1"
+    "replication": "3"
   },
   "tableIndexConfig": {},
   "routing": {
@@ -26,7 +26,7 @@
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092",
           "realtime.segment.flush.threshold.time": "3600000",
-          "realtime.segment.flush.threshold.size": "50000"
+          "realtime.segment.flush.threshold.size": "500"
         }
       ]
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org