You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/29 18:39:58 UTC
[incubator-pinot] branch master updated: [Cleanup] Merge
RealtimeSegmentOnlineOfflineStateModel and SegmentOnlineOfflineStateModel
in CommonConstants (#5459)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6bfcacb [Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and SegmentOnlineOfflineStateModel in CommonConstants (#5459)
6bfcacb is described below
commit 6bfcacb239f55c27491c2a580bf4a63cf2ec88fe
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri May 29 11:39:44 2020 -0700
[Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and SegmentOnlineOfflineStateModel in CommonConstants (#5459)
We only have one SegmentOnlineOfflineStateModel, so there is no value keeping both of them
---
.../HelixExternalViewBasedQueryQuotaManager.java | 4 +-
.../pinot/broker/routing/RoutingManager.java | 6 +--
.../instanceselector/BaseInstanceSelector.java | 6 +--
.../segmentselector/RealtimeSegmentSelector.java | 4 +-
.../instanceselector/InstanceSelectorTest.java | 8 ++--
.../segmentselector/SegmentSelectorTest.java | 4 +-
.../apache/pinot/common/utils/CommonConstants.java | 10 +---
.../helix/core/PinotHelixResourceManager.java | 20 ++++----
.../segment/OfflineSegmentAssignment.java | 6 +--
.../segment/RealtimeSegmentAssignment.java | 14 +++---
.../assignment/segment/SegmentAssignmentUtils.java | 13 +++---
.../realtime/PinotLLCRealtimeSegmentManager.java | 39 ++++++++--------
.../helix/core/rebalance/TableRebalancer.java | 6 +--
.../helix/core/retention/RetentionManager.java | 3 +-
...fflineNonReplicaGroupSegmentAssignmentTest.java | 14 +++---
.../OfflineReplicaGroupSegmentAssignmentTest.java | 26 +++++------
...altimeNonReplicaGroupSegmentAssignmentTest.java | 19 ++++----
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 19 ++++----
.../segment/SegmentAssignmentUtilsTest.java | 8 ++--
.../PinotLLCRealtimeSegmentManagerTest.java | 54 +++++++++++-----------
.../core/rebalance/TableRebalancerClusterTest.java | 2 +-
.../helix/core/rebalance/TableRebalancerTest.java | 8 ++--
.../ControllerPeriodicTasksIntegrationTest.java | 8 ++--
.../server/starter/helix/HelixServerStarter.java | 5 +-
24 files changed, 148 insertions(+), 158 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 172b13a..3306e58 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -168,7 +168,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
if (stateMap != null) {
for (Map.Entry<String, String> state : stateMap.entrySet()) {
if (!_helixManager.getInstanceName().equals(state.getKey()) && state.getValue()
- .equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)) {
+ .equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) {
otherOnlineBrokerCount++;
}
}
@@ -304,7 +304,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
int otherOnlineBrokerCount = 0;
for (Map.Entry<String, String> state : stateMap.entrySet()) {
if (!_helixManager.getInstanceName().equals(state.getKey()) && state.getValue()
- .equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)) {
+ .equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) {
otherOnlineBrokerCount++;
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index 62eece4..e8bf81f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -51,7 +51,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.QueryConfig;
@@ -195,8 +195,8 @@ public class RoutingManager implements ClusterChangeHandler {
Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE) || instanceStateMap
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING)) {
onlineSegments.add(entry.getKey());
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 404bfd6..512f721 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -31,7 +31,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HashUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,9 +153,9 @@ abstract class BaseInstanceSelector implements InstanceSelector {
String instance = instanceStateEntry.getKey();
String state = instanceStateEntry.getValue();
// Do not track instances in ERROR state
- if (!state.equals(RealtimeSegmentOnlineOfflineStateModel.ERROR)) {
+ if (!state.equals(SegmentStateModel.ERROR)) {
_instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
- if (state.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) {
+ if (state.equals(SegmentStateModel.OFFLINE)) {
offlineInstances.add(instance);
} else {
onlineInstances.add(instance);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index b89961a..a052da2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -28,7 +28,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
@@ -90,7 +90,7 @@ public class RealtimeSegmentSelector implements SegmentSelector {
HLCSegmentName hlcSegmentName = new HLCSegmentName(segment);
groupIdToHLCSegmentsMap.computeIfAbsent(hlcSegmentName.getGroupId(), k -> new ArrayList<>()).add(segment);
} else {
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
// Keep the first CONSUMING segment for each partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(), (k, consumingSegment) -> {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 8748f77..3806a27 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -34,10 +34,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.testng.annotations.Test;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
index 9da10c0..b447bb6 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
@@ -32,8 +32,8 @@ import org.testng.annotations.Test;
import static org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.FORCE_HLC;
import static org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.ROUTING_OPTIONS_KEY;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEqualsNoOrder;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 18a4bf2..bff81f4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -56,20 +56,14 @@ public class CommonConstants {
public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
public static class StateModel {
- public static class SegmentOnlineOfflineStateModel {
- public static final String ONLINE = "ONLINE";
- public static final String OFFLINE = "OFFLINE";
- public static final String ERROR = "ERROR";
- }
-
- public static class RealtimeSegmentOnlineOfflineStateModel {
+ public static class SegmentStateModel {
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";
}
- public static class BrokerOnlineOfflineStateModel {
+ public static class BrokerResourceStateModel {
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final String ERROR = "ERROR";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ec96836..6674aff 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -72,8 +72,8 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants.Helix;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
@@ -95,15 +95,15 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.config.instance.Instance;
-import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -631,7 +631,7 @@ public class PinotHelixResourceManager {
instanceStateMap.clear();
}
for (String brokerInstance : brokerInstances) {
- idealState.setPartitionState(tableNameWithType, brokerInstance, BrokerOnlineOfflineStateModel.ONLINE);
+ idealState.setPartitionState(tableNameWithType, brokerInstance, BrokerResourceStateModel.ONLINE);
}
return idealState;
}, DEFAULT_RETRY_POLICY);
@@ -651,7 +651,7 @@ public class PinotHelixResourceManager {
Preconditions.checkNotNull(tableConfig);
String brokerTag = TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
if (brokerTag.equals(brokerTenantTag)) {
- tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE);
+ tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerResourceStateModel.ONLINE);
}
}
_helixAdmin.setResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
@@ -1141,8 +1141,8 @@ public class PinotHelixResourceManager {
List<String> brokers = HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag);
HelixHelper.updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
assert idealState != null;
- idealState.getRecord().getMapFields().put(tableNameWithType,
- SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerOnlineOfflineStateModel.ONLINE));
+ idealState.getRecord().getMapFields()
+ .put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerResourceStateModel.ONLINE));
return idealState;
});
}
@@ -1563,7 +1563,7 @@ public class PinotHelixResourceManager {
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
offlineTableName);
currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentOnlineOfflineStateModel.ONLINE));
+ SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
return idealState;
});
@@ -2090,7 +2090,7 @@ public class PinotHelixResourceManager {
_helixAdmin.enableInstance(_helixClusterName, instanceName, enableInstance);
long intervalWaitTimeMs = 500L;
long deadline = System.currentTimeMillis() + timeOutMs;
- String offlineState = SegmentOnlineOfflineStateModel.OFFLINE;
+ String offlineState = SegmentStateModel.OFFLINE;
while (System.currentTimeMillis() < deadline) {
PropertyKey liveInstanceKey = _keyBuilder.liveInstance(instanceName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 166f2a7..3e02759 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -33,7 +33,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -179,8 +179,8 @@ public class OfflineSegmentAssignment implements SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segment : currentAssignment.keySet()) {
List<String> assignedInstances = assignSegment(segment, newAssignment, instancePartitions);
- newAssignment.put(segment,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentOnlineOfflineStateModel.ONLINE));
+ newAssignment
+ .put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
} else {
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 177b6ee..22a5f0a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -29,7 +29,7 @@ import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -208,8 +208,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segment : completedSegmentAssignment.keySet()) {
List<String> assignedInstances = assignCompletedSegment(segment, newAssignment, completedInstancePartitions);
- newAssignment.put(segment, SegmentAssignmentUtils
- .getInstanceStateMap(assignedInstances, RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ newAssignment
+ .put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE));
}
} else {
if (completedInstancePartitions.getNumReplicaGroups() == 1) {
@@ -252,8 +252,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segmentName : completedSegmentAssignment.keySet()) {
List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions);
- Map<String, String> instanceStateMap = SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ Map<String, String> instanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE);
newAssignment.put(segmentName, instanceStateMap);
}
}
@@ -267,8 +267,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
for (String segmentName : consumingSegmentAssignment.keySet()) {
List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions);
- Map<String, String> instanceStateMap = SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ Map<String, String> instanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING);
newAssignment.put(segmentName, instanceStateMap);
}
} else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 1b301ad..edf8553 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -30,8 +30,7 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.Pairs;
@@ -142,7 +141,7 @@ public class SegmentAssignmentUtils {
Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) {
// Use Helix AutoRebalanceStrategy to rebalance the table
LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
- states.put(SegmentOnlineOfflineStateModel.ONLINE, replication);
+ states.put(SegmentStateModel.ONLINE, replication);
AutoRebalanceStrategy autoRebalanceStrategy =
new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states);
// Make a copy of the current assignment because this step might change the passed in assignment
@@ -252,8 +251,8 @@ public class SegmentAssignmentUtils {
Map<String, String> instanceStateMap = new TreeMap<>();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
- instanceStateMap.put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId),
- SegmentOnlineOfflineStateModel.ONLINE);
+ instanceStateMap
+ .put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId), SegmentStateModel.ONLINE);
}
return instanceStateMap;
}
@@ -305,9 +304,9 @@ public class SegmentAssignmentUtils {
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
_completedSegmentAssignment.put(segmentName, instanceStateMap);
- } else if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
_consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
_offlineSegmentAssignment.put(segmentName, instanceStateMap);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2325e92..8cae0ec 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -49,7 +49,7 @@ import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
@@ -412,9 +412,9 @@ public class PinotLLCRealtimeSegmentManager {
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
IdealState idealState = getIdealState(realtimeTableName);
- Preconditions.checkState(idealState.getInstanceStateMap(committingSegmentName)
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING),
- "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
+ Preconditions
+ .checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
int numPartitions = getNumPartitionsFromIdealState(idealState);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -608,11 +608,11 @@ public class PinotLLCRealtimeSegmentManager {
assert idealState != null;
Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
String state = stateMap.get(instanceName);
- if (RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) {
- stateMap.put(instanceName, RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ if (SegmentStateModel.CONSUMING.equals(state)) {
+ stateMap.put(instanceName, SegmentStateModel.OFFLINE);
} else {
- LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}",
- segmentName, state, instanceName);
+ LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state,
+ instanceName);
}
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
@@ -733,16 +733,16 @@ public class PinotLLCRealtimeSegmentManager {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet();
- instanceStatesMap.put(committingSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instances, RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ instanceStatesMap
+ .put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE));
LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName);
}
// Assign instances to the new segment and add instances as state CONSUMING
List<String> instancesAssigned =
segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
- instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ instanceStatesMap.put(newSegmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
}
@@ -854,7 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
- if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE)
@@ -868,9 +868,8 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(latestSegmentName,
- new StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
+ new StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
@@ -883,7 +882,7 @@ public class PinotLLCRealtimeSegmentManager {
// 1. all replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment marked itself OFFLINE during consumption for some reason
// 2. all replicas ONLINE and metadata DONE - Resolved in https://github.com/linkedin/pinot/pull/2890
// 3. we should never end up with some replicas ONLINE and some OFFLINE.
- if (isAllInstancesInState(instanceStateMap, RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) {
+ if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
// Create a new segment to re-consume from the previous start offset
@@ -932,7 +931,7 @@ public class PinotLLCRealtimeSegmentManager {
for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey());
if (llcSegmentName.getPartitionId() == partitionId && segmentEntry.getValue()
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ .containsValue(SegmentStateModel.CONSUMING)) {
previousConsumingSegment = llcSegmentName.getSegmentName();
break;
}
@@ -985,8 +984,8 @@ public class PinotLLCRealtimeSegmentManager {
new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
long startOffset = getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId);
- CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
- new StreamPartitionMsgOffset(startOffset), 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ new CommittingSegmentDescriptor(null, new StreamPartitionMsgOffset(startOffset), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index c6c5d57..5115fe1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -35,7 +35,7 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
@@ -473,7 +473,7 @@ public class TableRebalancer {
for (Map.Entry<String, String> instanceStateEntry : idealStateInstanceStateMap.entrySet()) {
// Ignore OFFLINE state in IdealState
String idealStateInstanceState = instanceStateEntry.getValue();
- if (idealStateInstanceState.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) {
+ if (idealStateInstanceState.equals(SegmentStateModel.OFFLINE)) {
continue;
}
@@ -486,7 +486,7 @@ public class TableRebalancer {
String instanceName = instanceStateEntry.getKey();
String externalViewInstanceState = externalViewInstanceStateMap.get(instanceName);
if (!idealStateInstanceState.equals(externalViewInstanceState)) {
- if (RealtimeSegmentOnlineOfflineStateModel.ERROR.equals(externalViewInstanceState)) {
+ if (SegmentStateModel.ERROR.equals(externalViewInstanceState)) {
if (bestEfforts) {
LOGGER
.warn("Found ERROR instance: {} for segment: {}, table: {}, counting it as good state (best-efforts)",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index a616fc6..140e64e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -174,8 +174,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
} else {
// Delete segment if all of its replicas are OFFLINE
Set<String> states = new HashSet<>(stateMap.values());
- return states.size() == 1 && states
- .contains(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
+ return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
}
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
index d037725..104770f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
@@ -27,7 +27,7 @@ import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -95,8 +95,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId));
expectedAssignedInstanceId = (expectedAssignedInstanceId + 1) % NUM_INSTANCES;
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@@ -106,8 +106,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
@@ -134,8 +134,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 31f3bb4..78a06bc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
@@ -201,8 +201,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId));
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@@ -234,8 +234,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId));
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
}
@@ -245,8 +245,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
@@ -274,8 +274,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
@@ -303,8 +303,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order
@@ -326,8 +326,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their alphabetical order within the partition
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 51a13ab..b659514 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -145,7 +145,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
String offlineSegmentName = "offlineSegment";
Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
- RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
// Rebalance without COMPLETED instance partitions should not change the segment assignment
@@ -167,14 +167,14 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING segments
Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
}
}
}
@@ -215,10 +215,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
for (int i = 0; i < NUM_REPLICAS; i++) {
String expectedInstance = COMPLETED_INSTANCES.get(index++ % NUM_COMPLETED_INSTANCES);
- assertEquals(instanceStateMap.get(expectedInstance), RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(instanceStateMap.get(expectedInstance), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING and OFFLINE segments should not be reassigned
@@ -234,12 +234,11 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS);
Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition);
currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils
- .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
- RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), SegmentStateModel.ONLINE));
}
// Add the new segment into the assignment as CONSUMING
- currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ currentAssignment.put(_segments.get(segmentId),
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 6eb5148..ee3c3c5 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
@@ -170,7 +170,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
String offlineSegmentName = "offlineSegment";
Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
- RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
// Rebalance without COMPLETED instance partitions should not change the segment assignment
@@ -192,14 +192,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING segments
Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
}
}
}
@@ -239,12 +239,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
int expectedInstanceId = index++ % numCompletedInstancesPerReplicaGroup;
for (int i = 0; i < NUM_REPLICAS; i++) {
String expectedInstance =
COMPLETED_INSTANCES.get(expectedInstanceId + i * numCompletedInstancesPerReplicaGroup);
- assertEquals(instanceStateMap.get(expectedInstance), RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(instanceStateMap.get(expectedInstance), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING and OFFLINE segments should not be reassigned
@@ -260,12 +260,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS);
Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition);
currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils
- .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
- RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), SegmentStateModel.ONLINE));
}
// Add the new segment into the assignment as CONSUMING
- currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ currentAssignment.put(_segments.get(segmentId),
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index ef8364e..6e867c6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -57,8 +57,8 @@ public class SegmentAssignmentUtilsTest {
instancesAssigned.add(instances.get(assignedInstanceId));
assignedInstanceId = (assignedInstanceId + 1) % numInstances;
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
@@ -240,7 +240,7 @@ public class SegmentAssignmentUtilsTest {
instancesAssigned.add(instances.get(assignedInstanceId));
}
currentAssignment.put(segments.get(segmentId),
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE));
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index db98072..485ed6c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Helix;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
@@ -157,7 +157,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNotNull(instanceStateMap);
assertEquals(instanceStateMap.size(), numReplicas);
for (String state : instanceStateMap.values()) {
- assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(state, SegmentStateModel.CONSUMING);
}
LLCRealtimeSegmentZKMetadata segmentZKMetadata =
@@ -188,7 +188,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Commit a segment for partition 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L);
+ new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS),
+ 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
@@ -196,13 +197,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ Collections.singleton(SegmentStateModel.ONLINE));
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
// Verify segment ZK metadata for committed segment and new consuming segment
LLCRealtimeSegmentZKMetadata committedSegmentZKMetadata =
@@ -223,11 +224,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
// Turn one instance of the consuming segment OFFLINE and commit the segment
- consumingSegmentInstanceStateMap.entrySet().iterator().next()
- .setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ consumingSegmentInstanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE);
committingSegment = consumingSegment;
- committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS), 0L);
+ committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment,
+ new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS), 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
@@ -235,13 +235,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ Collections.singleton(SegmentStateModel.ONLINE));
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
// Illegal segment commit - commit the segment again
try {
@@ -357,7 +357,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> instanceStateMap = instanceStatesMap.get(segmentName);
assertEquals(instanceStateMap.size(), segmentManager._numReplicas);
for (String state : instanceStateMap.values()) {
- assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(state, SegmentStateModel.CONSUMING);
}
// NOTE: Old segment ZK metadata might exist when previous round failed due to not enough instances
assertTrue(segmentZKMetadataMap.containsKey(segmentName));
@@ -513,15 +513,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.remove(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
if (latestCommittedSegment != null) {
Map<String, String> latestCommittedSegmentInstanceStateMap = instanceStatesMap.get(latestCommittedSegment);
assertNotNull(latestCommittedSegmentInstanceStateMap);
for (Map.Entry<String, String> entry : latestCommittedSegmentInstanceStateMap.entrySet()) {
// Latest committed segment should have all instances in ONLINE state
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE);
- entry.setValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
+ entry.setValue(SegmentStateModel.CONSUMING);
}
}
}
@@ -535,8 +535,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNotNull(consumingSegmentInstanceStateMap);
for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) {
// Consuming segment should have all instances in CONSUMING state
- assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
- entry.setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
+ entry.setValue(SegmentStateModel.OFFLINE);
}
}
@@ -581,8 +581,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> instanceStateMap = entry.getValue();
// Skip segments with all instances OFFLINE
- if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE) || instanceStateMap
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING)) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionsId = llcSegmentName.getPartitionId();
Map<Integer, String> sequenceNumberToSegmentMap = partitionIdToSegmentsMap.get(partitionsId);
@@ -601,8 +601,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Latest segment should have CONSUMING instance but no ONLINE instance in ideal state
Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment);
- assertTrue(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
- assertFalse(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
+ assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
// Latest segment ZK metadata should be IN_PROGRESS
assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS);
@@ -612,8 +612,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Committed segment should have all instances in ONLINE state
instanceStateMap = instanceStatesMap.get(segmentName);
- assertEquals(new HashSet<>(instanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ assertEquals(new HashSet<>(instanceStateMap.values()), Collections.singleton(SegmentStateModel.ONLINE));
// Committed segment ZK metadata should be DONE
LLCRealtimeSegmentZKMetadata segmentZKMetadata = segmentManager._segmentZKMetadataMap.get(segmentName);
@@ -654,7 +653,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Commit a segment for partition 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L);
+ new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS),
+ 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
try {
@@ -684,7 +684,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager();
String segmentLocation = SCHEME + tableDir + "/" + segmentFileName;
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation);
+ new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0,
+ segmentLocation);
segmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor);
assertFalse(segmentFile.exists());
}
@@ -709,7 +710,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager();
String segmentLocation = SCHEME + tableDir + "/" + segmentFileName;
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation);
+ new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0,
+ segmentLocation);
segmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor);
assertFalse(segmentFile.exists());
assertFalse(extraSegmentFile.exists());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
index 4253c9e..d135ad1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 5dcec81..a4e41f9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -24,10 +24,10 @@ import java.util.TreeMap;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.testng.annotations.Test;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 1976fa9..80a6a34 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
-import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
@@ -229,7 +229,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
idealState -> {
assertNotNull(idealState);
Map<String, String> instanceStateMap = idealState.getRecord().getMapFields().values().iterator().next();
- instanceStateMap.entrySet().iterator().next().setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ instanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE);
return idealState;
}, RetryPolicies.fixedDelayRetryPolicy(2, 10));
@@ -324,9 +324,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati
for (Map<String, String> instanceStateMap : idealState.getRecord().getMapFields().values()) {
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
String state = entry.getValue();
- if (state.equals(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (state.equals(SegmentStateModel.CONSUMING)) {
consumingServers.add(entry.getKey());
- } else if (state.equals(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ } else if (state.equals(SegmentStateModel.ONLINE)) {
completedServers.add(entry.getKey());
}
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index b583b81..8b5db0c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -161,7 +161,7 @@ public class HelixServerStarter implements ServiceStartable {
}
if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) {
for (String partitionName : idealState.getPartitionSet()) {
- if (StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING
+ if (StateModel.SegmentStateModel.CONSUMING
.equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
foundConsuming = true;
break;
@@ -570,8 +570,7 @@ public class HelixServerStarter implements ServiceStartable {
for (String partition : externalView.getPartitionSet()) {
Map<String, String> instanceStateMap = externalView.getStateMap(partition);
String state = instanceStateMap.get(_instanceId);
- if (StateModel.SegmentOnlineOfflineStateModel.ONLINE.equals(state)
- || StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) {
+ if (StateModel.SegmentStateModel.ONLINE.equals(state) || StateModel.SegmentStateModel.CONSUMING.equals(state)) {
return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org