You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/13 17:53:47 UTC
(pinot) 02/02: Initial POC code for hybrid table
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 35e96d1eb5a5662a62ab43dac4ad7ea9716f8b42
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Mon Feb 12 23:36:47 2024 -0800
Initial POC code for hybrid table
---
.../controller/helix/SegmentStatusChecker.java | 6 +-
...imeSegmentOnlineOfflineStateModelGenerator.java | 72 +++++
.../helix/core/PinotHelixResourceManager.java | 30 +-
.../helix/core/PinotTableIdealStateBuilder.java | 17 +-
.../realtime/MissingConsumingSegmentFinder.java | 7 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 30 +-
.../helix/core/util/HelixSetupUtils.java | 21 +-
.../realtime/RealtimeSegmentDataManager.java | 7 +
.../server/starter/helix/BaseServerStarter.java | 13 +-
...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++
.../airlineStats_realtime_table_config.json | 4 +-
11 files changed, 488 insertions(+), 44 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..2b9431b0f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
int nReplicas = 0;
int nIdeal = 0;
nSegments++;
+ Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName);
+ if (partitionMap == null) {
+ continue;
+ }
// Skip segments not online in ideal state
- for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+ for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) {
if (serverAndState == null) {
break;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..ec640131cb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator {
+ private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() {
+ }
+
+ public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL =
+ "RealtimeSegmentOnlineOfflineStateModel";
+
+ public static final String ONLINE_STATE = "ONLINE";
+ public static final String CONSUMING_STATE = "CONSUMING";
+ public static final String OFFLINE_STATE = "OFFLINE";
+ public static final String DROPPED_STATE = "DROPPED";
+
+ public static StateModelDefinition generatePinotStateModelDefinition() {
+ StateModelDefinition.Builder builder =
+ new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+ builder.initialState(OFFLINE_STATE);
+
+ builder.addState(ONLINE_STATE);
+ builder.addState(CONSUMING_STATE);
+ builder.addState(OFFLINE_STATE);
+ builder.addState(DROPPED_STATE);
+ // Set the initial state when the node starts
+
+ // Add transitions between the states.
+ builder.addTransition(CONSUMING_STATE, ONLINE_STATE);
+ builder.addTransition(OFFLINE_STATE, CONSUMING_STATE);
+// builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+ builder.addTransition(CONSUMING_STATE, OFFLINE_STATE);
+ builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+ builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+ // set constraints on states.
+ // static constraint
+ builder.dynamicUpperBound(ONLINE_STATE, "R");
+ // dynamic constraint, R means it should be derived based on the replication
+ // factor.
+
+ StateModelDefinition statemodelDefinition = builder.build();
+ return statemodelDefinition;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ae208715fd..0a308fb514 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager {
Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
"Invalid table type: %s", tableType);
- IdealState idealState;
- if (tableType == TableType.REALTIME) {
- idealState =
- PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
- _enableBatchMessageMode);
- } else {
- // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
- idealState =
- PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
- _enableBatchMessageMode);
- }
+ IdealState idealState =
+ PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+ _enableBatchMessageMode);
+// if (tableType == TableType.REALTIME) {
+// idealState =
+// PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+// _enableBatchMessageMode);
+// } else {
+// // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+// idealState =
+// PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+// _enableBatchMessageMode);
+// }
// IdealState idealState =
// PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
// _enableBatchMessageMode);
@@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
if (tableType == TableType.REALTIME) {
// TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+ currentAssignmentList.put(segmentName, Collections.emptyList()
+ /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+// currentAssignment.put(segmentName,
+// SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
} else {
// TODO: Assess whether to pass in an empty instance list or to set the preferred list
currentAssignmentList.put(segmentName, Collections.emptyList()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 63222df7e3..f02ea86dc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
@@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder {
public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
boolean enableBatchMessageMode) {
LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ String stateModel;
+ if (tableType == null) {
+ throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType);
+ } else if (TableType.OFFLINE.equals(tableType)) {
+ stateModel =
+ PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ } else {
+ stateModel =
+ PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ }
+
// FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
// TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
idealStateBuilder
- .setStateModel(
- PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+ .setStateModel(stateModel)
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
// TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..03d72600c7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -25,6 +25,7 @@ import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder {
// Note that there is no problem in case the partition group has reached its end of life.
SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
.fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName());
+ String endOffset = segmentZKMetadata.getEndOffset();
+ if (StringUtils.isEmpty(endOffset)) {
+ return;
+ }
StreamPartitionMsgOffset completedSegmentEndOffset =
- _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+ _streamPartitionMsgOffsetFactory.create(endOffset);
if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
// there are unconsumed messages available on the stream
missingSegmentInfo._totalCount++;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 003f985e3f..80aed78c04 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -71,7 +71,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -521,9 +520,16 @@ public class PinotLLCRealtimeSegmentManager {
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
IdealState idealState = getIdealState(realtimeTableName);
- Preconditions.checkState(
- idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
- "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+ // Check whether there is at least 1 replica in ONLINE state for full-auto mode.
+ if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+ Preconditions.checkState(
+ idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+ "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName);
+ } else {
+ Preconditions.checkState(
+ idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+ }
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
/*
@@ -993,9 +999,11 @@ public class PinotLLCRealtimeSegmentManager {
// TODO: Need to figure out the best way to handle committed segments' state change
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
- Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
- instanceStatesMap.put(committingSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
+// instanceStatesMap.put(committingSegmentName,
+// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
+ instanceStatesList.put(newSegmentName, Collections.emptyList()
+ /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName);
}
@@ -1029,14 +1037,14 @@ public class PinotLLCRealtimeSegmentManager {
List<String> instancesAssigned =
segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
// No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
- instanceStatesMap.put(newSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+// instanceStatesMap.put(newSegmentName,
+// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
// TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
// Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
// this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
// looks like it does)
- // instanceStatesList.put(newSegmentName, Collections.emptyList()
- // /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+ instanceStatesList.put(newSegmentName, Collections.emptyList()
+ /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 61f6112365..a14010f17f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,23 +139,24 @@ public class HelixSetupUtils {
private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
- String segmentStateModelName =
- PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
- StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+ String realtimeSegmentStateModelName =
+ PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ StateModelDefinition stateModelDefinition =
+ helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName);
if (stateModelDefinition == null || isUpdateStateModel) {
if (stateModelDefinition == null) {
- LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
+ LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName);
} else {
- LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
+ LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName);
}
- helixDataAccessor
- .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ helixDataAccessor.createStateModelDef(
+ PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
}
String offlineSegmentStateModelName =
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
- StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
- offlineSegmentStateModelName);
+ StateModelDefinition offlineStateModelDefinition =
+ helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName);
if (offlineStateModelDefinition == null || isUpdateStateModel) {
if (stateModelDefinition == null) {
LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..5cdb3dda3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING;
}
+ /**
+ * Returns the state of the realtime segment.
+ */
+ public State getState() {
+ return _state;
+ }
+
/**
* Returns the timestamp of the last consumed message.
*/
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6e0d157075..9862ffb7dd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable {
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
initSegmentFetcher(_serverConf);
- StateModelFactory<?> stateModelFactoryWithRealtime =
- new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+// StateModelFactory<?> stateModelFactoryWithRealtime =
+// new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
StateModelFactory<?> stateModelFactory =
new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+ StateModelFactory<?> realtimeSegmentStateModelFactory =
+ new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
_helixManager.getStateMachineEngine()
- .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
- stateModelFactoryWithRealtime);
+ .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(),
+ realtimeSegmentStateModelFactory);
+// _helixManager.getStateMachineEngine()
+// .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+// stateModelFactoryWithRealtime);
// Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
// the property store, but before receiving state transitions
_helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..58c785583a
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private final String _instanceId;
+ private final InstanceDataManager _instanceDataManager;
+
+ public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+ _instanceId = instanceId;
+ _instanceDataManager = instanceDataManager;
+ }
+
+ public static String getStateModelName() {
+ return "RealtimeSegmentOnlineOfflineStateModel";
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partitionName) {
+ return new RealtimeSegmentOnlineOfflineStateModel();
+ }
+
+
+ // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+ // The transitions in the helix messages indicate the from/to states, and helix uses the
+ // Transition annotations (but only if StateModelInfo is defined).
+ @SuppressWarnings("unused")
+ @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+ public class RealtimeSegmentOnlineOfflineStateModel extends StateModel {
+ private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel");
+
+ @Transition(from = "OFFLINE", to = "CONSUMING")
+ public void onBecomeConsumingFromOffline(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
+ String realtimeTableName = message.getResourceName();
+ String segmentName = message.getPartitionName();
+
+ // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+// TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+// Preconditions.checkNotNull(tableType);
+// if (tableType == TableType.OFFLINE) {
+// _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+// return;
+// }
+
+ try {
+ _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s",
+ realtimeTableName, segmentName);
+ _logger.error(errorMessage, e);
+ TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+ if (tableDataManager != null) {
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ }
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "CONSUMING", to = "ONLINE")
+ public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
+ String realtimeTableName = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ Preconditions.checkNotNull(tableType);
+
+ // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+// if (tableType == TableType.OFFLINE) {
+// try {
+// _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+// } catch (Exception e) {
+// String errorMessage = String.format(
+// "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+// realtimeTableName, segmentName);
+// _logger.error(errorMessage, e);
+// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+// if (tableDataManager != null) {
+// tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+// errorMessage, e));
+// }
+// Utils.rethrowException(e);
+// }
+// } else {
+ TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+ Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+ tableDataManager.onConsumingToOnline(segmentName);
+ boolean isConsumingDone = false;
+ while (!isConsumingDone) {
+ SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName);
+ // For this transition to be correct in helix, we should already have a segment that is consuming
+ Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName,
+ realtimeTableName);
+
+ // TODO: https://github.com/apache/pinot/issues/10049
+ try {
+ // This indicates that the realtime segment is already a completed immutable segment.
+ // So nothing to do for this transition.
+ if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
+ // We found an LLC segment that is not consuming right now, must be that we already swapped it with a
+ // segment that has been built. Nothing to do for this state transition.
+ _logger.info(
+ "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition",
+ acquiredSegment.getSegmentName());
+ return;
+ }
+ RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment;
+ RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+ if (state.shouldConsume()) {
+ Thread.sleep(10_000L);
+ } else {
+ SegmentZKMetadata segmentZKMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName,
+ segmentName);
+ segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+ isConsumingDone = true;
+ }
+
+// // Use a single thread to wait for the consuming segment to be fully completed
+// ExecutorService executorService = Executors.newSingleThreadExecutor();
+// Future<Boolean> future = executorService.submit(() -> {
+// try {
+// while (true) {
+// RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+// if (state.shouldConsume()) {
+// Thread.sleep(5_000L);
+// } else {
+// segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+// return true;
+// }
+//// RealtimeSegmentDataManager.State state = segmentDataManager.getState();
+//// if (state.isFinal()) {
+//// return true;
+//// }
+// }
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
+// });
+// future.get();
+// executorService.shutdown();
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+ realtimeTableName, segmentName);
+ _logger.error(errorMessage, e);
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ Utils.rethrowException(e);
+ } finally {
+ tableDataManager.releaseSegment(acquiredSegment);
+ }
+ }
+ }
+
+ @Transition(from = "CONSUMING", to = "OFFLINE")
+ public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message);
+ String realtimeTableName = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}",
+ realtimeTableName, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "CONSUMING", to = "DROPPED")
+ public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message);
+ String realtimeTableName = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+ Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
+ tableDataManager.onConsumingToDropped(segmentName);
+ try {
+ _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+ _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}",
+ realtimeTableName, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+
+// @Transition(from = "OFFLINE", to = "ONLINE")
+// public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+// _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+// String tableNameWithType = message.getResourceName();
+// String segmentName = message.getPartitionName();
+// TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+// Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+// "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+// try {
+// _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+// } catch (Exception e) {
+// String errorMessage =
+// String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+// tableNameWithType, segmentName);
+// _logger.error(errorMessage, e);
+// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+// if (tableDataManager != null) {
+// tableDataManager.addSegmentError(segmentName,
+// new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+// }
+// Utils.rethrowException(e);
+// }
+// }
+
+ // Remove segment from InstanceDataManager.
+ // Still keep the data files in local.
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ // Delete segment from local directory.
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+ }
+
+ @Transition(from = "ERROR", to = "DROPPED")
+ public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+ _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+ }
+}
diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 9d1dcd7e14..018ee3b6e8 100644
--- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -6,7 +6,7 @@
"timeColumnName": "DaysSinceEpoch",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "5",
- "replication": "1"
+ "replication": "3"
},
"tableIndexConfig": {},
"routing": {
@@ -26,7 +26,7 @@
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092",
"realtime.segment.flush.threshold.time": "3600000",
- "realtime.segment.flush.threshold.size": "50000"
+ "realtime.segment.flush.threshold.size": "500"
}
]
},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org