You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/13 17:53:46 UTC
(pinot) 01/02: Initial POC code
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9864022a2f0e9a7a8c5f2e3e6ab07ae89f658497
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Wed Jan 17 13:31:58 2024 -0800
Initial POC code
---
.../common/assignment/InstancePartitionsUtils.java | 11 +-
.../pinot/common/utils/helix/HelixHelper.java | 10 +-
.../pinot/controller/LeadControllerManager.java | 1 +
.../PinotInstanceAssignmentRestletResource.java | 4 +-
.../api/resources/PinotTenantRestletResource.java | 3 +-
...ineSegmentOnlineOfflineStateModelGenerator.java | 65 +++++++++
.../helix/core/PinotHelixResourceManager.java | 46 ++++--
...lixSegmentOnlineOfflineStateModelGenerator.java | 2 +-
.../helix/core/PinotTableIdealStateBuilder.java | 30 ++++
.../assignment/segment/SegmentAssignmentUtils.java | 8 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 50 ++++---
.../helix/core/rebalance/TableRebalancer.java | 46 +++++-
.../helix/core/util/HelixSetupUtils.java | 19 +++
.../PinotLLCRealtimeSegmentManagerTest.java | 6 +-
.../integration/tests/HelixZNodeSizeLimitTest.java | 19 ++-
.../server/starter/helix/BaseServerStarter.java | 9 +-
...flineSegmentOnlineOfflineStateModelFactory.java | 162 +++++++++++++++++++++
.../SegmentOnlineOfflineStateModelFactory.java | 38 +++++
.../org/apache/pinot/tools/HybridQuickstart.java | 5 +-
.../tools/admin/command/MoveReplicaGroup.java | 17 ++-
.../tools/admin/command/QuickstartRunner.java | 2 +-
.../airlineStats_offline_table_config.json | 4 +-
22 files changed, 501 insertions(+), 56 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index 759d387af4..f8bbd08934 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -23,7 +23,9 @@ import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -165,12 +167,19 @@ public class InstancePartitionsUtils {
* Persists the instance partitions to Helix property store.
*/
public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
- InstancePartitions instancePartitions) {
+ ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions) {
String path = ZKMetadataProvider
.constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName());
if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) {
throw new ZkException("Failed to persist instance partitions: " + instancePartitions);
}
+
+ // Set the INSTANCE_PARTITIONS under the RESOURCE config (only modifying set path for now to ensure it works)
+ // This is just a test to see how to access and update the CONFIG/RESOURCES
+ String resourceName = "INSTANCE_PARTITION_" + instancePartitions.getInstancePartitionsName();
+ ResourceConfig resourceConfig = new ResourceConfig(resourceName);
+ resourceConfig.setPreferenceLists(instancePartitions.getPartitionToInstancesMap());
+ configAccessor.setResourceConfig(helixClusterName, resourceName, resourceConfig);
}
/**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 4160dd44ef..8f76ba801f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -185,9 +185,12 @@ public class HelixHelper {
String partitionName = it.next();
int numChars = partitionName.length();
Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
- for (Map.Entry<String, String> entry : stateMap.entrySet()) {
- numChars += entry.getKey().length();
- numChars += entry.getValue().length();
+ if (stateMap != null) {
+ // The stateMap might be NULL for FULL-AUTO segments, so always do this NULL check
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ numChars += entry.getKey().length();
+ numChars += entry.getValue().length();
+ }
}
numChars *= is.getNumPartitions();
if (_minNumCharsInISToTurnOnCompression > 0
@@ -200,6 +203,7 @@ public class HelixHelper {
});
return idealStateWrapper._idealState;
} catch (Exception e) {
+ LOGGER.error("Caught exception while updating ideal state for resource: " + resourceName, e);
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
index 5c595bae2a..a1a4f7c270 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -220,6 +220,7 @@ public class LeadControllerManager {
return;
}
+ LOGGER.info("************** onResourceConfigChange() fired ***************");
boolean leadControllerResourceEnabled;
try {
leadControllerResourceEnabled = LeadControllerUtils.isLeadControllerResourceEnabled(_helixManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 282431e04b..6fd157d620 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -301,7 +301,9 @@ public class PinotInstanceAssignmentRestletResource {
private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
try {
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
- InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(),
+ _resourceManager.getHelixZkManager().getConfigAccessor(), _resourceManager.getHelixClusterName(),
+ instancePartitions);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 8166427a93..1fea68c75f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -359,7 +359,8 @@ public class PinotTenantRestletResource {
try {
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
- instancePartitions);
+ _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(),
+ _pinotHelixResourceManager.getHelixClusterName(), instancePartitions);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..e4f93b3a52
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * Offline Segment state model generator describes the transitions for offline segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which includes the CONSUMING state)
+ */
+public class PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator {
+ private PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator() {
+ }
+
+ public static final String PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = "OfflineSegmentOnlineOfflineStateModel";
+
+ public static final String ONLINE_STATE = "ONLINE";
+ public static final String OFFLINE_STATE = "OFFLINE";
+ public static final String DROPPED_STATE = "DROPPED";
+
+ public static StateModelDefinition generatePinotStateModelDefinition() {
+ StateModelDefinition.Builder builder =
+ new StateModelDefinition.Builder(PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+ builder.initialState(OFFLINE_STATE);
+
+ builder.addState(ONLINE_STATE);
+ builder.addState(OFFLINE_STATE);
+ builder.addState(DROPPED_STATE);
+ // Set the initial state when the node starts
+
+ // Add transitions between the states.
+ builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+ builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+ builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+ // set constraints on states.
+ // static constraint
+ builder.dynamicUpperBound(ONLINE_STATE, "R");
+ // dynamic constraint, R means it should be derived based on the replication
+ // factor.
+
+ StateModelDefinition statemodelDefinition = builder.build();
+ return statemodelDefinition;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 35c942aba6..ae208715fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1579,16 +1579,29 @@ public class PinotHelixResourceManager {
LOGGER.info("Adding table {}: Validate table configs", tableNameWithType);
validateTableTenantConfig(tableConfig);
- IdealState idealState =
- PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
- _enableBatchMessageMode);
TableType tableType = tableConfig.getTableType();
+ Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
+ "Invalid table type: %s", tableType);
+
+ IdealState idealState;
+ if (tableType == TableType.REALTIME) {
+ idealState =
+ PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+ _enableBatchMessageMode);
+ } else {
+ // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only
+ idealState =
+ PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+ _enableBatchMessageMode);
+ }
+ // IdealState idealState =
+ // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(),
+ // _enableBatchMessageMode);
+
// Ensure that table is not created if schema is not present
if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) {
throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType);
}
- Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME,
- "Invalid table type: %s", tableType);
// Add table config
LOGGER.info("Adding table {}: Creating table config in the property store", tableNameWithType);
@@ -1785,7 +1798,8 @@ public class PinotHelixResourceManager {
referenceInstancePartitionsName);
}
}
- InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+ _helixClusterName, instancePartitions);
}
}
@@ -1803,7 +1817,8 @@ public class PinotHelixResourceManager {
instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
- InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(),
+ _helixClusterName, instancePartitions);
}
}
}
@@ -2243,7 +2258,8 @@ public class PinotHelixResourceManager {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
- if (currentAssignment.containsKey(segmentName)) {
+ Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
+ if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) {
LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName,
tableNameWithType);
} else {
@@ -2251,8 +2267,18 @@ public class PinotHelixResourceManager {
segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == TableType.REALTIME) {
+ // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated
+ currentAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
+ } else {
+ // TODO: Assess whether to pass in an empty instance list or to set the preferred list
+ currentAssignmentList.put(segmentName, Collections.emptyList()
+ /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+ }
+ // currentAssignment.put(segmentName,
+ // SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
return idealState;
});
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
index 18b6ac0a75..3e52de0359 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java
@@ -69,7 +69,7 @@ public class PinotHelixSegmentOnlineOfflineStateModelGenerator {
builder.dynamicUpperBound(ONLINE_STATE, "R");
// dynamic constraint, R means it should be derived based on the replication
// factor.
- builder.dynamicUpperBound(CONSUMING_STATE, "R");
+ // builder.dynamicUpperBound(CONSUMING_STATE, "R");
StateModelDefinition statemodelDefinition = builder.build();
return statemodelDefinition;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 23a115417f..63222df7e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -19,8 +19,10 @@
package org.apache.pinot.controller.helix.core;
import java.util.List;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
@@ -41,6 +43,7 @@ public class PinotTableIdealStateBuilder {
public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas,
boolean enableBatchMessageMode) {
+ LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType);
customModeIdealStateBuilder
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
@@ -51,6 +54,33 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
+ public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas,
+ boolean enableBatchMessageMode) {
+ LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas);
+ // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default
+ // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too
+ FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType);
+ idealStateBuilder
+ .setStateModel(
+ PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+ .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+ // TODO: Revisit the rebalance strategy to use (maybe we add a custom one)
+ .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+ // The below config guarantees if active number of replicas is no less than minimum active replica, there will
+ // not be partition movements happened.
+ // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix
+ // controller waits at most 5 minutes and then re-calculate the participant assignment.
+ // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added
+ idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+ idealStateBuilder.setRebalanceDelay(300_000);
+ idealStateBuilder.enableDelayRebalance();
+ // Set instance group tag
+ IdealState idealState = idealStateBuilder.build();
+ idealState.setInstanceGroupTag(tableNameWithType);
+ idealState.setBatchMessageMode(enableBatchMessageMode);
+ return idealState;
+ }
+
/**
* Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream,
* with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 52f736a555..46e4cc4eca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -272,6 +273,13 @@ public class SegmentAssignmentUtils {
return instanceStateMap;
}
+ public static List<String> getInstanceStateList(Collection<String> instances) {
+ List<String> instanceStateList = new ArrayList<>();
+ instanceStateList.addAll(instances);
+ Collections.sort(instanceStateList);
+ return instanceStateList;
+ }
+
/**
* Returns a map from instance name to number of segments to be moved to it.
*/
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 25e40084ab..003f985e3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -332,12 +332,13 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+ Map<String, List<String>> instancesStateList = idealState.getRecord().getListFields();
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
numPartitionGroups, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
- instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName,
+ segmentAssignment, instancePartitionsMap);
}
setIdealState(realtimeTableName, idealState);
@@ -818,6 +819,7 @@ public class PinotLLCRealtimeSegmentManager {
try {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
+ // TODO: how to handle such state updates for FULL-AUTO mode? So far we don't enable FULL-AUTO for REALTIME
Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
String state = stateMap.get(instanceName);
if (SegmentStateModel.CONSUMING.equals(state)) {
@@ -972,7 +974,8 @@ public class PinotLLCRealtimeSegmentManager {
throw new HelixHelper.PermanentUpdaterException(
"Exceeded max segment completion time for segment " + committingSegmentName);
}
- updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
+ updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
+ idealState.getRecord().getListFields(), committingSegmentName,
isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -984,8 +987,10 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
+ Map<String, List<String>> instanceStatesList,
@Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ // TODO: Need to figure out the best way to handle committed segments' state change
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
@@ -1023,8 +1028,15 @@ public class PinotLLCRealtimeSegmentManager {
// Assign instances to the new segment and add instances as state CONSUMING
List<String> instancesAssigned =
segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+ // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME
instanceStatesMap.put(newSegmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+ // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list.
+ // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support
+ // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it
+ // looks like it does)
+ // instanceStatesList.put(newSegmentName, Collections.emptyList()
+ // /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
}
}
@@ -1116,6 +1128,7 @@ public class PinotLLCRealtimeSegmentManager {
Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+ Map<String, List<String>> instanceStatesList = idealState.getRecord().getListFields();
long currentTimeMs = getCurrentTimeMs();
StreamPartitionMsgOffsetFactory offsetFactory =
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
@@ -1181,14 +1194,14 @@ public class PinotLLCRealtimeSegmentManager {
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
- segmentAssignment, instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+ newSegmentName, segmentAssignment, instancePartitionsMap);
} else { // partition group reached end of life
LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+ "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId,
latestSegmentName);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
- instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName,
+ null, segmentAssignment, instancePartitionsMap);
}
}
// else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
@@ -1212,8 +1225,8 @@ public class PinotLLCRealtimeSegmentManager {
partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
- newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
- instancePartitionsMap, startOffset);
+ newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+ segmentAssignment, instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1231,8 +1244,8 @@ public class PinotLLCRealtimeSegmentManager {
partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
- newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
- instancePartitionsMap, startOffset);
+ newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList,
+ segmentAssignment, instancePartitionsMap, startOffset);
} else {
LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
latestSegmentName);
@@ -1269,8 +1282,8 @@ public class PinotLLCRealtimeSegmentManager {
partitionGroupId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
}
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
- segmentAssignment, instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment,
+ latestSegmentName, segmentAssignment, instancePartitionsMap);
} else {
LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}",
latestSegmentZKMetadata.getStatus(), latestSegmentName);
@@ -1285,8 +1298,8 @@ public class PinotLLCRealtimeSegmentManager {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
numPartitions, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
- instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
}
}
@@ -1296,7 +1309,8 @@ public class PinotLLCRealtimeSegmentManager {
private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig,
SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
- Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
+ Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList,
+ SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
int numPartitions = newPartitionGroupMetadataList.size();
@@ -1307,8 +1321,8 @@ public class PinotLLCRealtimeSegmentManager {
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
String newSegmentName = newLLCSegmentName.getSegmentName();
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
- instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
}
private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 5c3514fa7e..f97aa7b06a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -158,6 +159,16 @@ public class TableRebalancer {
}
}
+ private Map<String, List<String>> convertTargetAssignmentToListFields(
+ Map<String, Map<String, String>> targetAssignment, boolean setInstances) {
+ Map<String, List<String>> convertedListField = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : targetAssignment.entrySet()) {
+ List<String> value = setInstances ? new ArrayList<>(entry.getValue().keySet()) : Collections.emptyList();
+ convertedListField.put(entry.getKey(), value);
+ }
+ return convertedListField;
+ }
+
private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
@Nullable String rebalanceJobId) {
long startTimeMs = System.currentTimeMillis();
@@ -299,8 +310,19 @@ public class TableRebalancer {
LOGGER.info("For rebalanceId: {}, rebalancing table: {} with downtime", rebalanceJobId, tableNameWithType);
// Reuse current IdealState to update the IdealState in cluster
+ // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+ // be set up at all. Maybe we don't even need this TableRebalancer?
+ TableType tableType = tableConfig.getTableType();
ZNRecord idealStateRecord = currentIdealState.getRecord();
- idealStateRecord.setMapFields(targetAssignment);
+ if (tableType == TableType.REALTIME) {
+ // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+ idealStateRecord.setMapFields(targetAssignment);
+ } else {
+ // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+ Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+ idealStateRecord.setListFields(listFieldsConverted);
+ }
+ // idealStateRecord.setMapFields(targetAssignment);
currentIdealState.setNumPartitions(targetAssignment.size());
currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));
@@ -496,7 +518,18 @@ public class TableRebalancer {
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
// Reuse current IdealState to update the IdealState in cluster
- idealStateRecord.setMapFields(nextAssignment);
+ // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not
+ // be set up at all. Maybe we don't even need this TableRebalancer?
+ TableType tableType = tableConfig.getTableType();
+ if (tableType == TableType.REALTIME) {
+ // TODO: Only set listFields once REALTIME tables use FULL-AUTO
+ idealStateRecord.setMapFields(targetAssignment);
+ } else {
+ // TODO: Assess whether we should set the preferred host list or not, for now setting empty list
+ Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false);
+ idealStateRecord.setListFields(listFieldsConverted);
+ }
+ // idealStateRecord.setMapFields(nextAssignment);
idealState.setNumPartitions(nextAssignment.size());
idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));
@@ -598,7 +631,7 @@ public class TableRebalancer {
if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
- instancePartitions);
+ _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
}
} else {
String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
@@ -614,7 +647,7 @@ public class TableRebalancer {
LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions,
preConfiguredInstancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
- instancePartitions);
+ _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
}
} else {
instancePartitions =
@@ -625,7 +658,7 @@ public class TableRebalancer {
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
- instancePartitions);
+ _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
}
}
}
@@ -730,7 +763,8 @@ public class TableRebalancer {
boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
- InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+ _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions);
}
return Pair.of(instancePartitions, instancePartitionsUnchanged);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 8d21d18b1f..61f6112365 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
@@ -45,6 +46,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
+import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -79,6 +81,8 @@ public class HelixSetupUtils {
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
Map<String, String> configMap = new HashMap<>();
configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true));
+ configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
+ Boolean.toString(true));
configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE));
configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false));
@@ -147,6 +151,21 @@ public class HelixSetupUtils {
helixDataAccessor
.createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
}
+
+ String offlineSegmentStateModelName =
+ PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
+ offlineSegmentStateModelName);
+ if (offlineStateModelDefinition == null || isUpdateStateModel) {
+ if (stateModelDefinition == null) {
+ LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName);
+ } else {
+ LOGGER.info("Updating offline segment state model: {} to contain CONSUMING state",
+ offlineSegmentStateModelName);
+ }
+ helixDataAccessor.createStateModelDef(
+ PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ }
}
private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 60b83ba24a..a882a47ec7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1221,9 +1221,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null,
+ updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+ _idealState.getRecord().getListFields(), committingSegmentName, null,
segmentAssignment, instancePartitionsMap);
- updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName,
+ updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
+ _idealState.getRecord().getListFields(), null, newSegmentName,
segmentAssignment, instancePartitionsMap);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
index 37d3aa6ca3..9dec9df6b6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java
@@ -19,7 +19,8 @@
package org.apache.pinot.integration.tests;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import java.util.Map;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -83,15 +84,23 @@ public class HelixZNodeSizeLimitTest extends BaseClusterIntegrationTest {
try {
HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
+ Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields();
for (int i = 0; i < 500_000; i++) {
- currentAssignment.put("segment_" + i,
- ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
- currentAssignment.put("segment_" + i,
- ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+ // currentAssignment.put("segment_" + i,
+ // ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE"));
+ // currentAssignment.put("segment_" + i,
+ // ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE"));
+ currentAssignmentList.put("segment_" + i,
+ ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 10)));
+ currentAssignmentList.put("segment_" + i,
+ ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 9)));
}
return idealState;
});
} catch (Exception e) {
+ System.out.println("exception: " + e);
+ System.out.println("exception: " + e.getMessage());
+ System.out.println("exception: " + e.getCause());
Assert.fail("Exception shouldn't be thrown even if the data size of the ideal state is larger than 1M");
}
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 2b771707d2..6e0d157075 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,10 +579,15 @@ public abstract class BaseServerStarter implements ServiceStartable {
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
initSegmentFetcher(_serverConf);
- StateModelFactory<?> stateModelFactory =
+ StateModelFactory<?> stateModelFactoryWithRealtime =
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+ StateModelFactory<?> stateModelFactory =
+ new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
+ _helixManager.getStateMachineEngine()
+ .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
_helixManager.getStateMachineEngine()
- .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
+ .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+ stateModelFactoryWithRealtime);
// Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
// the property store, but before receiving state transitions
_helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..0a0608b44d
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the CONSUMING state at all
+ */
+public class OfflineSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private final String _instanceId;
+ private final InstanceDataManager _instanceDataManager;
+
+ public OfflineSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
+ _instanceId = instanceId;
+ _instanceDataManager = instanceDataManager;
+ }
+
+ public static String getStateModelName() {
+ return "OfflineSegmentOnlineOfflineStateModel";
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partitionName) {
+ return new OfflineSegmentOnlineOfflineStateModelFactory.OfflineSegmentOnlineOfflineStateModel();
+ }
+
+
+ // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field.
+ // The transitions in the helix messages indicate the from/to states, and helix uses the
+ // Transition annotations (but only if StateModelInfo is defined).
+ @SuppressWarnings("unused")
+ @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+ public class OfflineSegmentOnlineOfflineStateModel extends StateModel {
+ private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - OfflineSegmentOnlineOfflineStateModel");
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME),
+ "TableType is null or is a REALTIME table, offline state model should not be called fo RT");
+ try {
+ _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s",
+ tableNameWithType, segmentName);
+ _logger.error(errorMessage, e);
+ TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
+ if (tableDataManager != null) {
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ }
+ Utils.rethrowException(e);
+ }
+ }
+
+ // Remove segment from InstanceDataManager.
+ // Still keep the data files in local.
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ // Delete segment from local directory.
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message);
+ }
+
+ @Transition(from = "ERROR", to = "DROPPED")
+ public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+ _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
+ }
+ }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 42d1642c7b..76b6e779dc 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -44,6 +44,14 @@ import org.slf4j.LoggerFactory;
* 1. Add a new segment
* 2. Refresh an existed now serving segment.
* 3. Delete an existed segment.
+ *
+ * TODO: Assess how to handle this state model for FULL-AUTO. Today we have states such as A -> B, B -> C and A -> C.
+ * FULL-AUTO optimizes such transitions and only calls A -> C in such cases. Due to semantics we need to allow
+ * REALTIME segments to directly move from OFFLINE -> ONLINE for completed segments which violates what FULL-AUTO
+ * does. Due to this limitation we never see transitions from OFFLINE -> CONSUMING even though we need this
+ * transition for all new CONSUMING segments.
+ * To unblock the POC, for now we have moved the OFFLINE segment state model to a different
+ * class: OfflineSegmentOnlineOfflineStateModelFactory
*/
public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
private final String _instanceId;
@@ -76,6 +84,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
_logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message);
String realtimeTableName = message.getResourceName();
String segmentName = message.getPartitionName();
+
+ // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+// TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+// Preconditions.checkNotNull(tableType);
+// if (tableType == TableType.OFFLINE) {
+// _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op");
+// return;
+// }
+
try {
_instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName);
} catch (Exception e) {
@@ -97,6 +114,26 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message);
String realtimeTableName = message.getResourceName();
String segmentName = message.getPartitionName();
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ Preconditions.checkNotNull(tableType);
+
+ // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now
+// if (tableType == TableType.OFFLINE) {
+// try {
+// _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName);
+// } catch (Exception e) {
+// String errorMessage = String.format(
+// "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s",
+// realtimeTableName, segmentName);
+// _logger.error(errorMessage, e);
+// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+// if (tableDataManager != null) {
+// tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(),
+// errorMessage, e));
+// }
+// Utils.rethrowException(e);
+// }
+// } else {
TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
tableDataManager.onConsumingToOnline(segmentName);
@@ -131,6 +168,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
tableDataManager.releaseSegment(acquiredSegment);
}
}
+// }
@Transition(from = "CONSUMING", to = "OFFLINE")
public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index e5bd332c2b..dc8285472a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -51,7 +51,8 @@ public class HybridQuickstart extends Quickstart {
public Map<String, Object> getConfigOverrides() {
Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
overrides.put("pinot.server.grpc.enable", "true");
- overrides.put("pinot.server.grpc.port", "8090");
+ // Commenting out the below to allow running more than 1 server
+ // overrides.put("pinot.server.grpc.port", "8090");
return overrides;
}
@@ -114,7 +115,7 @@ public class HybridQuickstart extends Quickstart {
quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir));
quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir));
final QuickstartRunner runner =
- new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 1, 1, quickstartRunnerDir,
+ new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 4, 1, quickstartRunnerDir,
getConfigOverrides());
startKafka();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
index 4ece1f5abf..b14cb9e240 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -45,6 +46,7 @@ import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -210,9 +212,22 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman
@Override
public IdealState apply(@Nullable IdealState input) {
Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields();
+ Map<String, List<String>> existingListField = input.getRecord().getListFields();
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName);
for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) {
- existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+ // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+ if (tableType == TableType.REALTIME) {
+ // TODO: Update listField only once REALTIME uses FULL-AUTO
+ existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue());
+ } else {
+ String segmentName = segmentEntry.getKey();
+ Map<String, String> segmentMapping = segmentEntry.getValue();
+ List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet());
+ Collections.sort(listOfHosts);
+ // TODO: Assess if we want to add the preferred list of hosts or not
+ existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */);
+ }
}
return input;
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e1e80e8ae0..310b6dc296 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -157,7 +157,7 @@ public class QuickstartRunner {
.setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME)
.setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath())
.setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())
- .setConfigOverrides(_configOverrides);
+ .setConfigOverrides(_configOverrides).setMultiStageServerPort(8040 + i).setMultiStageRunnerPort(8100 + i);
if (!serverStarter.execute()) {
throw new RuntimeException("Failed to start Server");
}
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 856719e058..12d29fc83d 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -6,7 +6,7 @@
"timeType": "DAYS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
- "replication": "1"
+ "replication": "3"
},
"tenants": {},
"fieldConfigList": [
@@ -44,7 +44,7 @@
"coldTier": {
"encodingType": "RAW",
"indexes": {
- "text": {
+ "inverted": {
"enabled": "true"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org