You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/15 07:12:20 UTC

(pinot) 01/01: Initial POC code for hybrid table

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit e4acc6e70c2be0125fbdd54dd3c2fe99db0d705d
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Mon Feb 12 23:36:47 2024 -0800

    Initial POC code for hybrid table
---
 .../controller/helix/SegmentStatusChecker.java     |   6 +-
 ...imeSegmentOnlineOfflineStateModelGenerator.java |  72 +++++
 .../helix/core/PinotHelixResourceManager.java      |  30 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  17 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  37 ++-
 .../helix/core/util/HelixSetupUtils.java           |  21 +-
 .../realtime/RealtimeSegmentDataManager.java       |   7 +
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   8 +-
 .../server/starter/helix/BaseServerStarter.java    |  13 +-
 ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++
 .../airlineStats_realtime_table_config.json        |   4 +-
 12 files changed, 499 insertions(+), 48 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..2b9431b0f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
       int nReplicas = 0;
       int nIdeal = 0;
       nSegments++;
+      Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName);
+      if (partitionMap == null) {
+        continue;
+      }
       // Skip segments not online in ideal state
-      for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+      for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) {
         if (serverAndState == null) {
           break;
         }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..ec640131cb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL =
+      "RealtimeSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String CONSUMING_STATE = "CONSUMING";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(CONSUMING_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(CONSUMING_STATE, ONLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, CONSUMING_STATE);
+//    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(CONSUMING_STATE, OFFLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ae208715fd..0a308fb514 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState;
-    if (tableType == TableType.REALTIME) {
-      idealState =
-           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-               _enableBatchMessageMode);
-    } else {
-      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
-      idealState =
-          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
-              _enableBatchMessageMode);
-    }
+    IdealState idealState =
+        PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+            _enableBatchMessageMode);
+//    if (tableType == TableType.REALTIME) {
+//      idealState =
+//           PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//               _enableBatchMessageMode);
+//    } else {
+//      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+//      idealState =
+//          PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+//              _enableBatchMessageMode);
+//    }
    // IdealState idealState =
    //     PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
    //         _enableBatchMessageMode);
@@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager {
             TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
             if (tableType == TableType.REALTIME) {
               // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
-              currentAssignment.put(segmentName,
-                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+//              currentAssignment.put(segmentName,
+//                  SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
             } else {
               // TODO: Assess whether to pass in an empty instance list or to set the preferred list
               currentAssignmentList.put(segmentName, Collections.emptyList()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 63222df7e3..f02ea86dc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
     LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
     // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
     // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
     FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
     idealStateBuilder
-        .setStateModel(
-            PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setStateModel(stateModel)
         .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
         // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
         .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..03d72600c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -25,6 +25,7 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder {
             // Note that there is no problem in case the partition group has reached its end of life.
             SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
                 .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName());
+            String endOffset = segmentZKMetadata.getEndOffset();
+            if (StringUtils.isEmpty(endOffset)) {
+              return;
+            }
             StreamPartitionMsgOffset completedSegmentEndOffset =
-                _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+                _streamPartitionMsgOffsetFactory.create(endOffset);
             if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
               // there are unconsumed messages available on the stream
               missingSegmentInfo._totalCount++;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 003f985e3f..4ccce0e1c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -71,7 +71,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -521,9 +520,16 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions.checkState(
-        idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-        "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    // Check whether there is at least 1 replica in ONLINE state for full-auto mode.
+    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+      Preconditions.checkState(
+          idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+          "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName);
+    } else {
+      Preconditions.checkState(
+          idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+    }
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -993,9 +999,11 @@ public class PinotLLCRealtimeSegmentManager {
     // TODO: Need to figure out the best way to handle committed segments' state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
-      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
-      instanceStatesMap.put(committingSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+//      Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+      instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName);
     }
 
@@ -1029,14 +1037,14 @@ public class PinotLLCRealtimeSegmentManager {
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
       // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
-      instanceStatesMap.put(newSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
       // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
       //       Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
       //       this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
       //       looks like it does)
-      // instanceStatesList.put(newSegmentName, Collections.emptyList()
-      //    /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+       instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -1247,8 +1255,11 @@ public class PinotLLCRealtimeSegmentManager {
                     newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
                     segmentAssignment, instancePartitionsMap, startOffset);
               } else {
-                LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
-                    latestSegmentName);
+                LOGGER.error(
+                    "Got unexpected instance state map: {} for segment: {}. Segment status: {}. "
+                        + "recreateDeletedConsumingSegment: {}",
+                    instanceStateMap, latestSegmentName, latestSegmentZKMetadata.getStatus(),
+                    recreateDeletedConsumingSegment);
               }
             }
             // else, the partition group has reached end of life. This is an acceptable state
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 61f6112365..a14010f17f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,23 +139,24 @@ public class HelixSetupUtils {
 
   private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
       HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
-    String segmentStateModelName =
-        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    String realtimeSegmentStateModelName =
+        PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName);
     if (stateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
-        LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
+        LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName);
       } else {
-        LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName);
       }
-      helixDataAccessor
-          .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+      helixDataAccessor.createStateModelDef(
+          PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     String offlineSegmentStateModelName =
         PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
-        offlineSegmentStateModelName);
+    StateModelDefinition offlineStateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName);
     if (offlineStateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
         LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..5cdb3dda3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
     return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
   }
 
+  /**
+   * Returns the state of the realtime segment.
+   */
+  public State getState() {
+    return _state;
+  }
+
   /**
    * Returns the timestamp of the last consumed message.
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 2389fe8ba6..50693aaf73 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -500,8 +500,12 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr
           }
         }
       });
-      Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0);
-      return seqNum.get();
+      if (seqNum.get() == -1) {
+        return 0;
+      } else {
+        Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0);
+        return seqNum.get();
+      }
     }
 
     public class ExceptingKafkaConsumer extends KafkaPartitionLevelConsumer {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6e0d157075..9862ffb7dd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable {
     Tracing.ThreadAccountantOps
         .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactoryWithRealtime =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+//    StateModelFactory<?> stateModelFactoryWithRealtime =
+//        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     StateModelFactory<?> stateModelFactory =
         new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+    StateModelFactory<?> realtimeSegmentStateModelFactory =
+        new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
     _helixManager.getStateMachineEngine()
         .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     _helixManager.getStateMachineEngine()
-        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
-            stateModelFactoryWithRealtime);
+        .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            realtimeSegmentStateModelFactory);
+//    _helixManager.getStateMachineEngine()
+//        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+//            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..d3a937eaef
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "RealtimeSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    return new RealtimeSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+  public class RealtimeSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "CONSUMING")
+    public void onBecomeConsumingFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+//        return;
+//      }
+
+      try {
+        _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "ONLINE")
+    public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToOnline(segmentName);
+      boolean isConsumingDone = false;
+      while (!isConsumingDone) {
+        SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName);
+        // For this transition to be correct in helix, we should already have a segment that is consuming
+        Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName,
+            realtimeTableName);
+
+        // TODO: https://github.com/apache/pinot/issues/10049
+        try {
+          // This indicates that the realtime segment is already a completed immutable segment.
+          // So nothing to do for this transition.
+          if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
+            // We found an LLC segment that is not consuming right now, must be that we already swapped it with a
+            // segment that has been built. Nothing to do for this state transition.
+            _logger.info(
+                "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition",
+                acquiredSegment.getSegmentName());
+            return;
+          }
+          RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment;
+          RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+          if ((!state.isFinal()) || state.shouldConsume()) {
+            Thread.sleep(10_000L);
+          } else {
+            SegmentZKMetadata segmentZKMetadata =
+                ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName,
+                    segmentName);
+            segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+            isConsumingDone = true;
+          }
+
+//          // Use a single thread to wait for the consuming segment to be fully completed
+//          ExecutorService executorService = Executors.newSingleThreadExecutor();
+//          Future<Boolean> future = executorService.submit(() -> {
+//            try {
+//              while (true) {
+//                RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+//                if (state.shouldConsume()) {
+//                  Thread.sleep(5_000L);
+//                } else {
+//                  segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+//                  return true;
+//                }
+////              RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+////              if (state.isFinal()) {
+////                return true;
+////              }
+//              }
+//            } catch (InterruptedException e) {
+//              throw new RuntimeException(e);
+//            }
+//          });
+//          future.get();
+//          executorService.shutdown();
+        } catch (Exception e) {
+          String errorMessage =
+              String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+                  realtimeTableName, segmentName);
+          _logger.error(errorMessage, e);
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          Utils.rethrowException(e);
+        } finally {
+          tableDataManager.releaseSegment(acquiredSegment);
+        }
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "OFFLINE")
+    public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "DROPPED")
+    public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+      tableDataManager.onConsumingToDropped(segmentName);
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+
+//    @Transition(from = "OFFLINE", to = "ONLINE")
+//    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+//      _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+//      String tableNameWithType = message.getResourceName();
+//      String segmentName = message.getPartitionName();
+//      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+//      Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+//          "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+//      try {
+//        _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+//      } catch (Exception e) {
+//        String errorMessage =
+//            String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+//                tableNameWithType, segmentName);
+//        _logger.error(errorMessage, e);
+//        TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+//        if (tableDataManager != null) {
+//          tableDataManager.addSegmentError(segmentName,
+//              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+//        }
+//        Utils.rethrowException(e);
+//      }
+//    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 9d1dcd7e14..018ee3b6e8 100644
--- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -6,7 +6,7 @@
     "timeColumnName": "DaysSinceEpoch",
     "retentionTimeUnit": "DAYS",
     "retentionTimeValue": "5",
-    "replication": "1"
+    "replication": "3"
   },
   "tableIndexConfig": {},
   "routing": {
@@ -26,7 +26,7 @@
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092",
           "realtime.segment.flush.threshold.time": "3600000",
-          "realtime.segment.flush.threshold.size": "50000"
+          "realtime.segment.flush.threshold.size": "500"
         }
       ]
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org