You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/03/09 05:25:43 UTC
[pinot] branch master updated: Allow replica group assignment support in tier configs (#10255)
This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7c3c8e8705 Allow replica group assignment support in tier configs (#10255)
7c3c8e8705 is described below
commit 7c3c8e87052edca26648a646850b2b17ec0aa690
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Thu Mar 9 10:55:35 2023 +0530
Allow replica group assignment support in tier configs (#10255)
* Allow replica group assignment support in tier configs
* Add reassignment logic
* Add tier to assignInstances
* Make changes to APIs
* Consolidate tier assignemnt configs inside InstanceAssignmentConfigsMap
* Add removal logic for tier partitions
* Lint fix
* Review comments
* Review comments
* Add tests
* Fix java8 quickstart
* Fix test
---------
Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
.../assignment/InstanceAssignmentConfigUtils.java | 20 +--
.../common/assignment/InstancePartitionsUtils.java | 16 ++
.../pinot/common/metadata/ZKMetadataProvider.java | 16 ++
.../common/utils/config/TableConfigUtils.java | 13 +-
.../common/utils/config/TableConfigSerDeTest.java | 10 +-
.../PinotInstanceAssignmentRestletResource.java | 188 +++++++++++++++------
.../api/resources/PinotTableRestletResource.java | 2 +-
.../helix/core/PinotHelixResourceManager.java | 32 +++-
.../instance/InstanceAssignmentDriver.java | 32 +++-
.../helix/core/rebalance/RebalanceResult.java | 8 +
.../helix/core/rebalance/TableRebalancer.java | 111 ++++++++----
...anceAssignmentRestletResourceStatelessTest.java | 141 ++++++++++------
.../instance/InstanceAssignmentTest.java | 64 +++----
.../TableRebalancerClusterStatelessTest.java | 127 +++++++++++++-
.../segment/local/utils/TableConfigUtils.java | 3 +-
.../segment/local/utils/TableConfigUtilsTest.java | 8 +-
.../apache/pinot/spi/config/table/TableConfig.java | 8 +-
.../utils/builder/ControllerRequestURLBuilder.java | 2 +-
.../spi/utils/builder/TableConfigBuilder.java | 4 +-
19 files changed, 594 insertions(+), 211 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 6a0ae1188e..b571918c0c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -44,11 +44,10 @@ public class InstanceAssignmentConfigUtils {
* backward-compatibility) COMPLETED server tag is overridden to be different from the CONSUMING server tag.
*/
public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
return (instanceAssignmentConfigMap != null
- && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED) != null) || TagNameUtils
- .isRelocateCompletedSegments(tableConfig.getTenantConfig());
+ && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) != null)
+ || TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
}
/**
@@ -60,21 +59,20 @@ public class InstanceAssignmentConfigUtils {
return true;
}
TableType tableType = tableConfig.getTableType();
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
switch (instancePartitionsType) {
// Allow OFFLINE instance assignment if the offline table has it configured or (for backward-compatibility) is
// using replica-group segment assignment
case OFFLINE:
return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null
- && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE) != null)
+ && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) != null)
|| AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
// Allow CONSUMING/COMPLETED instance assignment if the real-time table has it configured
case CONSUMING:
case COMPLETED:
return tableType == TableType.REALTIME && (instanceAssignmentConfigMap != null
- && instanceAssignmentConfigMap.get(instancePartitionsType) != null);
+ && instanceAssignmentConfigMap.get(instancePartitionsType.toString()) != null);
default:
throw new IllegalStateException();
}
@@ -89,10 +87,10 @@ public class InstanceAssignmentConfigUtils {
"Instance assignment is not allowed for the given table config");
// Use the instance assignment config from the table config if it exists
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
- InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(instancePartitionsType);
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ instanceAssignmentConfigMap.get(instancePartitionsType.toString());
if (instanceAssignmentConfig != null) {
return instanceAssignmentConfig;
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index a15554f3d3..759d387af4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -46,6 +46,7 @@ public class InstancePartitionsUtils {
}
public static final char TYPE_SUFFIX_SEPARATOR = '_';
+ public static final String TIER_SUFFIX = "__TIER__";
/**
* Returns the name of the instance partitions for the given table name (with or without type suffix) and instance
@@ -93,6 +94,11 @@ public class InstancePartitionsUtils {
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
}
+ public static String getInstancePartitionsNameForTier(String tableName, String tierName) {
+ return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + tierName;
+ }
+
+
/**
* Gets the instance partitions with the given name, and returns a re-named copy of the same.
* This method is useful when we use a table with instancePartitionsMap since in that case
@@ -177,4 +183,14 @@ public class InstancePartitionsUtils {
throw new ZkException("Failed to remove instance partitions: " + instancePartitionsName);
}
}
+
+ public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ List<InstancePartitions> instancePartitions = ZKMetadataProvider.getAllInstancePartitions(propertyStore);
+ instancePartitions.stream().filter(instancePartition -> instancePartition.getInstancePartitionsName()
+ .startsWith(TableNameBuilder.extractRawTableName(tableNameWithType) + TIER_SUFFIX))
+ .forEach(instancePartition -> {
+ removeInstancePartitions(propertyStore, instancePartition.getInstancePartitionsName());
+ });
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 623a856454..b14ba30391 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -26,9 +26,11 @@ import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
+import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SchemaUtils;
@@ -261,6 +263,20 @@ public class ZKMetadataProvider {
}
}
+ @Nullable
+ public static List<InstancePartitions> getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+ List<ZNRecord> znRecordss =
+ propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, null, AccessOption.PERSISTENT);
+
+ try {
+ return Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting instance partitions", e);
+ return null;
+ }
+ }
+
@Nullable
public static List<UserConfig> getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecordss =
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 8abb0ea964..9af13f44bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -113,11 +113,11 @@ public class TableConfigUtils {
queryConfig = JsonUtils.stringToObject(queryConfigString, QueryConfig.class);
}
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
String instanceAssignmentConfigMapString = simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY);
if (instanceAssignmentConfigMapString != null) {
instanceAssignmentConfigMap = JsonUtils.stringToObject(instanceAssignmentConfigMapString,
- new TypeReference<Map<InstancePartitionsType, InstanceAssignmentConfig>>() {
+ new TypeReference<Map<String, InstanceAssignmentConfig>>() {
});
}
@@ -181,9 +181,9 @@ public class TableConfigUtils {
}
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
- quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap,
- fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable,
- tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap);
+ quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+ dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList,
+ instancePartitionsMap, segmentAssignmentConfigMap);
}
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -216,8 +216,7 @@ public class TableConfigUtils {
if (queryConfig != null) {
simpleFields.put(TableConfig.QUERY_CONFIG_KEY, queryConfig.toJsonString());
}
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap));
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index aad62db8f7..30dbfe80b4 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false));
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE, instanceAssignmentConfig)).build();
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
checkInstanceAssignmentConfig(tableConfig);
@@ -488,12 +488,12 @@ public class TableConfigSerDeTest {
}
private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
assertNotNull(instanceAssignmentConfigMap);
assertEquals(instanceAssignmentConfigMap.size(), 1);
- assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
- InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
+ assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
InstanceTagPoolConfig tagPoolConfig = instanceAssignmentConfig.getTagPoolConfig();
assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index dfdaefafaa..aee7df4e8a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +46,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -57,6 +59,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -80,39 +83,57 @@ public class PinotInstanceAssignmentRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/instancePartitions")
@ApiOperation(value = "Get the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+ public Map<String, InstancePartitions> getInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
- InstancePartitionsType instancePartitionsType) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable String type) {
+ Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType != TableType.REALTIME) {
- if (instancePartitionsType == InstancePartitionsType.OFFLINE || instancePartitionsType == null) {
- InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null) {
+ InstancePartitions offlineInstancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
if (offlineInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.OFFLINE, offlineInstancePartitions);
+ instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstancePartitions);
}
}
}
if (tableType != TableType.OFFLINE) {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
- InstancePartitions consumingInstancePartitions = InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) {
+ InstancePartitions consumingInstancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
if (consumingInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), consumingInstancePartitions);
}
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
- InstancePartitions completedInstancePartitions = InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) {
+ InstancePartitions completedInstancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
if (completedInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions);
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+ }
+ }
+ }
+
+ List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (type == null || type.equals(tierConfig.getName())) {
+ InstancePartitions instancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+ if (instancePartitions != null) {
+ instancePartitionsMap.put(tierConfig.getName(), instancePartitions);
+ }
+ }
}
}
}
@@ -130,22 +151,20 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/assignInstances")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Assign server instances to a table")
- public Map<InstancePartitionsType, InstancePartitions> assignInstances(
+ public Map<String, InstancePartitions> assignInstances(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
- InstancePartitionsType instancePartitionsType,
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable String type,
@ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
+ Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
List<InstanceConfig> instanceConfigs = _resourceManager.getAllHelixInstanceConfigs();
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE
- || instancePartitionsType == null)) {
+ if (tableType != TableType.REALTIME && (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null)) {
TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName);
if (offlineTableConfig != null) {
try {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
+ if (InstanceAssignmentConfigUtils.allowInstanceAssignment(offlineTableConfig,
+ InstancePartitionsType.OFFLINE)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap, offlineTableConfig, instanceConfigs,
InstancePartitionsType.OFFLINE);
}
@@ -158,20 +177,20 @@ public class PinotInstanceAssignmentRestletResource {
}
}
}
- if (tableType != TableType.OFFLINE && instancePartitionsType != InstancePartitionsType.OFFLINE) {
+ if (tableType != TableType.OFFLINE && !InstancePartitionsType.OFFLINE.toString().equals(type)) {
TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
if (realtimeTableConfig != null) {
try {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
+ if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) {
+ if (InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+ InstancePartitionsType.CONSUMING)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
InstancePartitionsType.CONSUMING);
}
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
+ if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) {
+ if (InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+ InstancePartitionsType.COMPLETED)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
InstancePartitionsType.COMPLETED);
}
@@ -186,6 +205,16 @@ public class PinotInstanceAssignmentRestletResource {
}
}
+ TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
+ if (realtimeTableConfig != null) {
+ assignInstancesForTier(instancePartitionsMap, realtimeTableConfig, instanceConfigs, type);
+ }
+
+ TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName);
+ if (offlineTableConfig != null) {
+ assignInstancesForTier(instancePartitionsMap, offlineTableConfig, instanceConfigs, type);
+ }
+
if (instancePartitionsMap.isEmpty()) {
throw new ControllerApplicationException(LOGGER, "Failed to find the instance assignment config",
Response.Status.NOT_FOUND);
@@ -207,22 +236,43 @@ public class PinotInstanceAssignmentRestletResource {
* @param instanceConfigs list of instance configs
* @param instancePartitionsType type of instancePartitions
*/
- private void assignInstancesForInstancePartitionsType(
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
- List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
+ private void assignInstancesForInstancePartitionsType(Map<String, InstancePartitions> instancePartitionsMap,
+ TableConfig tableConfig, List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
- instancePartitionsMap.put(instancePartitionsType, InstancePartitionsUtils.fetchInstancePartitionsWithRename(
- _resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
- instancePartitionsType.getInstancePartitionsName(rawTableName)));
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+ InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+ tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+ instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
- instancePartitionsMap.put(instancePartitionsType, new InstanceAssignmentDriver(tableConfig)
- .assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions));
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+ new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
+ existingInstancePartitions));
+ }
+
+ private void assignInstancesForTier(Map<String, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
+ List<InstanceConfig> instanceConfigs, String tierName) {
+ if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getInstanceAssignmentConfigMap() != null) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if ((tierConfig.getName().equals(tierName) || tierName == null)
+ && tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null) {
+ InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions(
+ _resourceManager.getHelixZkManager().getHelixPropertyStore(),
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+
+ instancePartitionsMap.put(tierConfig.getName(),
+ new InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(), instanceConfigs,
+ existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())));
+ }
+ }
+ }
}
private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
@@ -240,7 +290,7 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/instancePartitions")
@Authenticate(AccessType.UPDATE)
@ApiOperation(value = "Create/update the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions(
+ public Map<String, InstancePartitions> setInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, String instancePartitionsStr) {
InstancePartitions instancePartitions;
try {
@@ -256,17 +306,32 @@ public class PinotInstanceAssignmentRestletResource {
if (tableType != TableType.REALTIME) {
if (InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions);
+ return Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instancePartitions);
}
}
if (tableType != TableType.OFFLINE) {
if (InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
+ return Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), instancePartitions);
}
if (InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) {
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.COMPLETED, instancePartitions);
+ return Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(), instancePartitions);
+ }
+ }
+
+ List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName())
+ .equals(instancePartitionsName)) {
+ persistInstancePartitionsHelper(instancePartitions);
+ return Collections.singletonMap(tierConfig.getName(), instancePartitions);
+ }
+ }
}
}
@@ -281,22 +346,39 @@ public class PinotInstanceAssignmentRestletResource {
@ApiOperation(value = "Remove the instance partitions")
public SuccessResponse removeInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
- InstancePartitionsType instancePartitionsType) {
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable
+ String instancePartitionsType) {
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE
+ if (tableType != TableType.REALTIME && (InstancePartitionsType.OFFLINE.toString().equals(instancePartitionsType)
|| instancePartitionsType == null)) {
removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
}
if (tableType != TableType.OFFLINE) {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
+ if (InstancePartitionsType.CONSUMING.toString().equals(instancePartitionsType)
+ || instancePartitionsType == null) {
removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
+ if (InstancePartitionsType.COMPLETED.toString().equals(instancePartitionsType)
+ || instancePartitionsType == null) {
removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
}
}
+
+ List<TableConfig> tableConfigs = Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (instancePartitionsType == null || instancePartitionsType.equals(tierConfig.getName())) {
+ removeInstancePartitionsHelper(
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+ }
+ }
+ }
+ }
return new SuccessResponse("Instance partitions removed");
}
@@ -315,16 +397,16 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/replaceInstance")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Replace an instance in the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> replaceInstance(
+ public Map<String, InstancePartitions> replaceInstance(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
- InstancePartitionsType instancePartitionsType,
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") @QueryParam("type") @Nullable
+ String type,
@ApiParam(value = "Old instance to be replaced", required = true) @QueryParam("oldInstanceId")
String oldInstanceId,
@ApiParam(value = "New instance to replace with", required = true) @QueryParam("newInstanceId")
String newInstanceId) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
- getInstancePartitions(tableName, instancePartitionsType);
+ Map<String, InstancePartitions> instancePartitionsMap =
+ getInstancePartitions(tableName, type);
Iterator<InstancePartitions> iterator = instancePartitionsMap.values().iterator();
while (iterator.hasNext()) {
InstancePartitions instancePartitions = iterator.next();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index cfa1cffe2d..e43b244893 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -683,7 +683,7 @@ public class PinotTableRestletResource {
});
return new RebalanceResult(RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
- dryRunResult.getSegmentAssignment());
+ dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 91dc57956f..48418f2bda 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -147,6 +147,7 @@ import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
@@ -1734,10 +1735,10 @@ public class PinotHelixResourceManager {
}
}
+ InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
+ List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
if (!instancePartitionsTypesToAssign.isEmpty()) {
LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, tableNameWithType);
- InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
- List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType);
@@ -1757,6 +1758,26 @@ public class PinotHelixResourceManager {
}
}
}
+
+ // Process and persist tier config instancePartitions
+ if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getInstanceAssignmentConfigMap() != null) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName())) {
+ if (override || InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tierConfig.getName()))
+ == null) {
+ LOGGER.info("Calculating instance partitions for tier: {}, table : {}", tierConfig.getName(),
+ tableNameWithType);
+ InstancePartitions instancePartitions =
+ instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
+ tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+ LOGGER.info("Persisting instance partitions: {}", instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
+ }
+ }
+ }
+ }
}
public void updateUserConfig(UserConfig userConfig)
@@ -1909,6 +1930,10 @@ public class PinotHelixResourceManager {
InstancePartitionsUtils.removeInstancePartitions(_propertyStore, offlineTableName);
LOGGER.info("Deleting table {}: Removed instance partitions", offlineTableName);
+ // Remove tier instance partitions
+ InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, offlineTableName);
+ LOGGER.info("Deleting table {}: Removed tier instance partitions", offlineTableName);
+
// Remove segment lineage
SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, offlineTableName);
LOGGER.info("Deleting table {}: Removed segment lineage", offlineTableName);
@@ -1968,6 +1993,9 @@ public class PinotHelixResourceManager {
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
LOGGER.info("Deleting table {}: Removed instance partitions", realtimeTableName);
+ InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, rawTableName);
+ LOGGER.info("Deleting table {}: Removed tier instance partitions", realtimeTableName);
+
// Remove segment lineage
SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, realtimeTableName);
LOGGER.info("Deleting table {}: Removed segment lineage", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index ba73f7bd6d..7a5c901029 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
@@ -55,15 +56,31 @@ public class InstanceAssignmentDriver {
public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
-
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
+ return getInstancePartitions(
+ instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+ assignmentConfig, instanceConfigs, existingInstancePartitions);
+ }
+
+ public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) {
+ return getInstancePartitions(
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName),
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+ }
+
+ private InstancePartitions getInstancePartitions(String instancePartitionsName,
+ InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions) {
+ String tableNameWithType = _tableConfig.getTableName();
+ LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
+
InstanceTagPoolSelector tagPoolSelector =
- new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType);
+ new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
- InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig();
+ InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
if (constraintConfig == null) {
LOGGER.info("No instance constraint is configured, using default hash-based-rotate instance constraint");
@@ -75,10 +92,9 @@ public class InstanceAssignmentDriver {
}
InstancePartitionSelector instancePartitionSelector =
- InstancePartitionSelectorFactory.getInstance(assignmentConfig.getPartitionSelector(),
- assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
- InstancePartitions instancePartitions = new InstancePartitions(
- instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+ InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
+ instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
+ InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index a2d6d630d0..4234abc5d5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
public class RebalanceResult {
private final Status _status;
private final Map<InstancePartitionsType, InstancePartitions> _instanceAssignment;
+ private final Map<String, InstancePartitions> _tierInstanceAssignment;
private final Map<String, Map<String, String>> _segmentAssignment;
private final String _description;
@@ -38,10 +39,12 @@ public class RebalanceResult {
public RebalanceResult(@JsonProperty(value = "status", required = true) Status status,
@JsonProperty(value = "description", required = true) String description,
@JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
+ @JsonProperty("tierInstanceAssignment") @Nullable Map<String, InstancePartitions> tierInstanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment) {
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
+ _tierInstanceAssignment = tierInstanceAssignment;
_segmentAssignment = segmentAssignment;
}
@@ -60,6 +63,11 @@ public class RebalanceResult {
return _instanceAssignment;
}
+ @JsonProperty
+ public Map<String, InstancePartitions> getTierInstanceAssignment() {
+ return _tierInstanceAssignment;
+ }
+
@JsonProperty
public Map<String, Map<String, String>> getSegmentAssignment() {
return _segmentAssignment;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index b8257ed450..218193e210 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -163,13 +163,13 @@ public class TableRebalancer {
IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType()) {
LOGGER.warn("Cannot rebalance table: {} with high-level consumer, aborting the rebalance", tableNameWithType);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance table with high-level consumer",
- null, null);
+ null, null, null);
}
} catch (Exception e) {
LOGGER.warn("Caught exception while validating table config for table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while validating table config: " + e,
- null, null);
+ null, null, null);
}
// Fetch ideal state
@@ -181,16 +181,17 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while fetching IdealState for table: {}, aborting the rebalance", tableNameWithType,
e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while fetching IdealState: " + e,
- null, null);
+ null, null, null);
}
if (currentIdealState == null) {
LOGGER.warn("Cannot find the IdealState for table: {}, aborting the rebalance", tableNameWithType);
- return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null);
+ return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null,
+ null);
}
if (!currentIdealState.isEnabled() && !downtime) {
LOGGER.warn("Cannot rebalance disabled table: {} without downtime, aborting the rebalance", tableNameWithType);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot rebalance disabled table without downtime",
- null, null);
+ null, null, null);
}
LOGGER.info("Fetching/computing instance partitions, reassigning instances if configured for table: {}",
@@ -205,13 +206,13 @@ public class TableRebalancer {
"Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
- "Caught exception while fetching/calculating instance partitions: " + e, null, null);
+ "Caught exception while fetching/calculating instance partitions: " + e, null, null, null);
}
// Calculate instance partitions for tiers if configured
List<Tier> sortedTiers = getSortedTiers(tableConfig);
Map<String, InstancePartitions> tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
LOGGER.info("Calculating the target assignment for table: {}", tableNameWithType);
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -224,7 +225,8 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while calculating target assignment for table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
- "Caught exception while calculating target assignment: " + e, instancePartitionsMap, null);
+ "Caught exception while calculating target assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, null);
}
if (currentAssignment.equals(targetAssignment)) {
@@ -233,20 +235,21 @@ public class TableRebalancer {
if (dryRun) {
return new RebalanceResult(RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
} else {
return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
}
} else {
return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
}
if (dryRun) {
LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType);
- return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment);
+ return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment);
}
if (downtime) {
@@ -267,12 +270,13 @@ public class TableRebalancer {
System.currentTimeMillis() - startTimeMs);
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not "
- + "reach the target segment assignment yet)", instancePartitionsMap, targetAssignment);
+ + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap,
+ targetAssignment);
} catch (Exception e) {
LOGGER.warn("Caught exception while updating IdealState for table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e,
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
}
}
@@ -295,7 +299,7 @@ public class TableRebalancer {
+ "replicas: {}, aborting the rebalance", minReplicasToKeepUpForNoDowntime, tableNameWithType,
numReplicas);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
}
minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
} else {
@@ -321,7 +325,7 @@ public class TableRebalancer {
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
// Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge
@@ -353,7 +357,8 @@ public class TableRebalancer {
try {
// Re-calculate the instance partitions in case the instance configs changed during the rebalance
instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false);
- tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
@@ -362,7 +367,7 @@ public class TableRebalancer {
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
} else {
LOGGER.info(
@@ -384,7 +389,7 @@ public class TableRebalancer {
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target segment assignment)",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
}
Map<String, Map<String, String>> nextAssignment =
@@ -412,7 +417,7 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while updating IdealState for table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e,
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
}
}
}
@@ -524,31 +529,73 @@ public class TableRebalancer {
}
@Nullable
- private Map<String, InstancePartitions> getTierToInstancePartitionsMap(String tableNameWithType,
- @Nullable List<Tier> sortedTiers) {
+ private Map<String, InstancePartitions> getTierToInstancePartitionsMap(TableConfig tableConfig,
+ @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun) {
if (sortedTiers == null) {
return null;
}
Map<String, InstancePartitions> tierToInstancePartitionsMap = new HashMap<>();
for (Tier tier : sortedTiers) {
LOGGER.info("Fetching/computing instance partitions for tier: {} of table: {}", tier.getName(),
- tableNameWithType);
- tierToInstancePartitionsMap.put(tier.getName(), getInstancePartitionsForTier(tier, tableNameWithType));
+ tableConfig.getTableName());
+ tierToInstancePartitionsMap.put(tier.getName(),
+ getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun));
}
return tierToInstancePartitionsMap;
}
/**
- * Creates a default instance assignment for the tier.
- * TODO: We only support default server-tag based assignment currently.
- * In next iteration, we will add InstanceAssignmentConfig to the TierConfig and also support persisting of the
- * InstancePartitions to zk.
- * Then we'll be able to support replica group assignment while creating InstancePartitions for tiers
+ * Computes the instance partitions for the given tier. If table's instanceAssignmentConfigMap has an entry for the
+ * tier, it's used to calculate the instance partitions. Else default instance partitions are returned
*/
- private InstancePartitions getInstancePartitionsForTier(Tier tier, String tableNameWithType) {
+ private InstancePartitions getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, boolean reassignInstances,
+ boolean bootstrap, boolean dryRun) {
PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage();
- return InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType,
- tier.getName(), storage.getServerTag());
+ InstancePartitions defaultInstancePartitions =
+ InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableConfig.getTableName(),
+ tier.getName(), storage.getServerTag());
+
+ if (tableConfig.getInstanceAssignmentConfigMap() == null || !tableConfig.getInstanceAssignmentConfigMap()
+ .containsKey(tier.getName())) {
+ LOGGER.info("Skipping fetching/computing instance partitions for tier {} for table: {}", tier.getName(),
+ tableConfig.getTableName());
+ if (!dryRun) {
+ String instancePartitionsName =
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+ LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName);
+ InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName);
+ }
+ return defaultInstancePartitions;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ String instancePartitionsName =
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+ if (reassignInstances) {
+ // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition
+ // map can be fully recalculated.
+ InstancePartitions existingInstancePartitions = bootstrap ? null
+ : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+ instancePartitionsName);
+ InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tier.getName(),
+ _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+ existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+ if (!dryRun) {
+ LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
+ }
+ return instancePartitions;
+ }
+
+ InstancePartitions instancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tier.getName()));
+ if (instancePartitions != null) {
+ return instancePartitions;
+ }
+
+ return defaultInstancePartitions;
}
private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index 38334bb25a..ef95da5135 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -21,15 +21,19 @@ package org.apache.pinot.controller.api;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -59,6 +63,8 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
private static final String RAW_TABLE_NAME = "testTable";
private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
+ private static final String TIER_NAME = "tier1";
+
@BeforeClass
public void setUp()
throws Exception {
@@ -114,13 +120,13 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
offlineTableConfig.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE, offlineInstanceAssignmentConfig));
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(offlineTableConfig);
// OFFLINE instance partitions should be generated
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = getInstancePartitionsMap();
+ Map<String, InstancePartitions> instancePartitionsMap = getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 1);
- InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
@@ -132,72 +138,108 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
realtimeTableConfig.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig));
+ Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
// CONSUMING instance partitions should be generated
instancePartitionsMap = getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 2);
- offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
- InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ InstancePartitions consumingInstancePartitions =
+ instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
String consumingInstanceId = consumingInstancePartitions.getInstances(0, 0).get(0);
+ // Add tier config and tier instance assignment config to the offline table config
+ offlineTableConfig.setTierConfigsList(Collections.singletonList(
+ new TierConfig(TIER_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), null,
+ null)));
+ InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
+ instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
+ instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
+ offlineTableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap);
+ _helixResourceManager.setExistingTableConfig(offlineTableConfig);
+
+ // tier instance partitions should be generated
+ Map<String, InstancePartitions> tierInstancePartitionsMap = getInstancePartitionsMap();
+ assertEquals(tierInstancePartitionsMap.size(), 3);
+ InstancePartitions tierInstancePartitions = tierInstancePartitionsMap.get(TIER_NAME);
+ assertNotNull(tierInstancePartitions);
+ assertEquals(tierInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tierInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tierInstancePartitions.getInstances(0, 0).size(), 1);
+
// Use OFFLINE instance assignment config as the COMPLETED instance assignment config
- realtimeTableConfig.setInstanceAssignmentConfigMap(
- new TreeMap<InstancePartitionsType, InstanceAssignmentConfig>() {{
- put(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig);
- put(InstancePartitionsType.COMPLETED, offlineInstanceAssignmentConfig);
- }});
+ realtimeTableConfig.setInstanceAssignmentConfigMap(new TreeMap<String, InstanceAssignmentConfig>() {{
+ put(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig);
+ put(InstancePartitionsType.COMPLETED.toString(), offlineInstanceAssignmentConfig);
+ }});
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
// COMPLETED instance partitions should be generated
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
- offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ assertEquals(instancePartitionsMap.size(), 4);
+ offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
- consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId));
- InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+ InstancePartitions completedInstancePartitions =
+ instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(completedInstancePartitions.getNumPartitions(), 1);
assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+ InstancePartitions tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+ assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
// Test fetching instance partitions by table name with type suffix
instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME),
null)));
- assertEquals(instancePartitionsMap.size(), 1);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+ assertEquals(instancePartitionsMap.size(), 2);
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
null)));
assertEquals(instancePartitionsMap.size(), 2);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Test fetching instance partitions by table name and instance partitions type
for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
- instancePartitionsMap = deserializeInstancePartitionsMap(
- sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, instancePartitionsType)));
+ instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
+ _controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, instancePartitionsType.toString())));
assertEquals(instancePartitionsMap.size(), 1);
- assertEquals(instancePartitionsMap.get(instancePartitionsType).getInstancePartitionsName(),
+ assertEquals(instancePartitionsMap.get(instancePartitionsType.toString()).getInstancePartitionsName(),
instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
}
+ // Test fetching instance partitions by table name and tier name
+ instancePartitionsMap = deserializeInstancePartitionsMap(
+ sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, TIER_NAME)));
+ assertEquals(instancePartitionsMap.size(), 1);
+ assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(),
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME, TIER_NAME));
+
// Remove the instance partitions for both offline and real-time table
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null));
try {
@@ -210,21 +252,25 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Assign instances without instance partitions type (dry run)
instancePartitionsMap = deserializeInstancePartitionsMap(
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, true), null));
- assertEquals(instancePartitionsMap.size(), 3);
- offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ assertEquals(instancePartitionsMap.size(), 4);
+ offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
- consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId));
- completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+ completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(completedInstancePartitions.getNumPartitions(), 1);
assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+ tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+ assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId));
// Instance partitions should not be persisted
try {
@@ -239,34 +285,36 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Instance partitions should be persisted
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
+ assertEquals(instancePartitionsMap.size(), 4);
// Remove the instance partitions for real-time table
sendDeleteRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
null));
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 1);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+ assertEquals(instancePartitionsMap.size(), 2);
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
// Assign instances for COMPLETED segments
instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, false), null));
assertEquals(instancePartitionsMap.size(), 1);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// There should be OFFLINE and COMPLETED instance partitions persisted
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 2);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+ assertEquals(instancePartitionsMap.size(), 3);
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Replace OFFLINE instance with CONSUMING instance for COMPLETED instance partitions
instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
_controllerRequestURLBuilder.forInstanceReplace(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED,
offlineInstanceId, consumingInstanceId), null));
assertEquals(instancePartitionsMap.size(), 1);
- assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0),
+ assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
// Replace the instance again using real-time table name (old instance does not exist)
@@ -284,26 +332,27 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null),
consumingInstancePartitions.toJsonString()));
assertEquals(instancePartitionsMap.size(), 1);
- assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0),
+ assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
// OFFLINE instance partitions should have OFFLINE instance, CONSUMING and COMPLETED instance partitions should have
// CONSUMING instance
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
- assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0, 0),
+ assertEquals(instancePartitionsMap.size(), 4);
+ assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString()).getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
- assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0),
+ assertEquals(instancePartitionsMap.get(TIER_NAME).getInstances(0, 0), Collections.singletonList(offlineInstanceId));
+ assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
- assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0),
+ assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
// Delete the offline table
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
instancePartitionsMap = getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 2);
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
- assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+ assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Delete the real-time table
_helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
@@ -315,18 +364,16 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
}
}
- private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap()
+ private Map<String, InstancePartitions> getInstancePartitionsMap()
throws Exception {
return deserializeInstancePartitionsMap(
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null)));
}
- private Map<InstancePartitionsType, InstancePartitions> deserializeInstancePartitionsMap(
- String instancePartitionsMapString)
+ private Map<String, InstancePartitions> deserializeInstancePartitionsMap(String instancePartitionsMapString)
throws Exception {
- return JsonUtils.stringToObject(instancePartitionsMapString,
- new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() {
- });
+ return JsonUtils.stringToObject(instancePartitionsMapString, new TypeReference<Map<String, InstancePartitions>>() {
+ });
}
@AfterClass
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4715271fc5..53bd2da317 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -352,7 +352,7 @@ public class InstanceAssignmentTest {
InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -405,7 +405,7 @@ public class InstanceAssignmentTest {
// Select all 3 pools in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
@@ -427,7 +427,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -449,7 +449,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = numPools;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -479,7 +479,7 @@ public class InstanceAssignmentTest {
numPools = 2;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Reset the instance configs to have only two pools.
instanceConfigs.clear();
@@ -528,7 +528,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -555,7 +555,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = 3;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -634,7 +634,7 @@ public class InstanceAssignmentTest {
// Reduce number of replica groups from 3 to 2.
numReplicaGroups = 2;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -761,7 +761,7 @@ public class InstanceAssignmentTest {
InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// No instance with correct tag
@@ -791,7 +791,7 @@ public class InstanceAssignmentTest {
// Enable pool
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// No instance has correct pool configured
@@ -825,7 +825,7 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for too many pools
@@ -837,7 +837,7 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2));
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for pool that does not exist
@@ -850,7 +850,7 @@ public class InstanceAssignmentTest {
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for too many instances
@@ -863,7 +863,7 @@ public class InstanceAssignmentTest {
// Enable replica-group
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Number of replica-groups must be positive
@@ -875,7 +875,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for too many replica-groups
@@ -888,7 +888,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for too many instances
@@ -900,7 +900,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Ask for too many instances per partition
@@ -913,7 +913,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
@@ -952,7 +952,7 @@ public class InstanceAssignmentTest {
try {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
@@ -979,7 +979,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1011,7 +1011,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1051,7 +1051,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1089,7 +1089,7 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
numInstancesPerPartition, false);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -1161,7 +1161,7 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
numInstancesPerPartition, true);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1242,7 +1242,7 @@ public class InstanceAssignmentTest {
SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(
Collections.singletonMap(partitionColumnName, new ColumnPartitionConfig("Modulo", numPartitionsSegment, null)));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
@@ -1316,7 +1316,7 @@ public class InstanceAssignmentTest {
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1372,7 +1372,7 @@ public class InstanceAssignmentTest {
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1439,7 +1439,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1505,7 +1505,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1576,7 +1576,7 @@ public class InstanceAssignmentTest {
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1627,7 +1627,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1691,7 +1691,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
- .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 4c006967d1..4b9e869dcf 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -196,7 +196,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
- tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
_helixResourceManager.updateTableConfig(tableConfig);
@@ -403,6 +403,131 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
assertTrue(instance.startsWith(expectedPrefix));
}
}
+ _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
+ }
+
+ @Test
+ public void testRebalanceWithTiersAndInstanceAssignments()
+ throws Exception {
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(
+ "replicaAssignment" + NO_TIER_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + i, false);
+ }
+ _helixResourceManager.createServerTenant(
+ new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME, numServers, numServers, 0));
+
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+ .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+ // Create the table
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
+ _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add 6 nodes tierA
+ for (int i = 0; i < 6; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(
+ "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + i, false);
+ }
+ _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, "replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+ // rebalance is NOOP and no change in assignment caused by new instances
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add tier config
+ tableConfig.setTierConfigsList(Lists.newArrayList(
+ new TierConfig(TIER_A_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "0d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" + TIER_A_NAME + "_OFFLINE", null, null)));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // rebalance should change assignment
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+ // check that segments have moved to tier a
+ Map<String, Map<String, String>> tierSegmentAssignment = rebalanceResult.getSegmentAssignment();
+ for (Map.Entry<String, Map<String, String>> entry : tierSegmentAssignment.entrySet()) {
+ Map<String, String> instanceStateMap = entry.getValue();
+ for (String instance : instanceStateMap.keySet()) {
+ assertTrue(instance.startsWith("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX));
+ }
+ }
+
+ // Test rebalance with tier instance assignment
+ InstanceTagPoolConfig tagPoolConfig =
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME), false, 0,
+ null);
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
+
+ InstancePartitions instancePartitions = rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
+
+ // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+ // [i2, i3, i4, i5, i0, i1]
+ // r0 r1 r2 r0 r1 r2
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 2,
+ "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 3,
+ "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 4,
+ "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 1));
+
+ // The assignment are based on replica-group 0 and mirrored to all the replica-groups, so server of index 0, 1, 5
+ // should have the same segments assigned, and server of index 2, 3, 4 should have the same segments assigned, each
+ // with 5 segments
+ Map<String, Map<String, String>> newSegmentAssignment = rebalanceResult.getSegmentAssignment();
+ int numSegmentsOnServer0 = 0;
+ for (int i = 0; i < numSegments; i++) {
+ String segmentName = SEGMENT_NAME_PREFIX + i;
+ Map<String, String> instanceStateMap = newSegmentAssignment.get(segmentName);
+ assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+ if (instanceStateMap.containsKey("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0)) {
+ numSegmentsOnServer0++;
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 0),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 1),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 5),
+ ONLINE);
+ } else {
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 2),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 3),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + 4),
+ ONLINE);
+ }
+ }
+ assertEquals(numSegmentsOnServer0, numSegments / 2);
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 046c0175e7..361999a881 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -595,7 +595,8 @@ public final class TableConfigUtils {
return;
}
for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
- Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
+ Preconditions.checkState(
+ !tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s",
instancePartitionsType));
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 88a829d846..ee14ef2b4e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1724,10 +1724,10 @@ public class TableConfigUtilsTest {
// Call validate with a table-config with instance partitions set but not instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
- TableConfig invalidTableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
- .setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, instanceAssignmentConfig))
+ TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
+ .setInstanceAssignmentConfigMap(
+ ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig))
.build();
try {
// Call validate with instance partitions and config set for the same type
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bae7b7c798..91a54bc942 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -89,7 +89,7 @@ public class TableConfig extends BaseJsonConfig {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
- private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
+ private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
@JsonPropertyDescription(value = "Point to an existing instance partitions")
private Map<InstancePartitionsType, String> _instancePartitionsMap;
@@ -128,7 +128,7 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(ROUTING_CONFIG_KEY) @Nullable RoutingConfig routingConfig,
@JsonProperty(QUERY_CONFIG_KEY) @Nullable QueryConfig queryConfig,
@JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap,
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap,
@JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig> fieldConfigList,
@JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
@JsonProperty(DEDUP_CONFIG_KEY) @Nullable DedupConfig dedupConfig,
@@ -267,12 +267,12 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY)
@Nullable
- public Map<InstancePartitionsType, InstanceAssignmentConfig> getInstanceAssignmentConfigMap() {
+ public Map<String, InstanceAssignmentConfig> getInstanceAssignmentConfigMap() {
return _instanceAssignmentConfigMap;
}
public void setInstanceAssignmentConfigMap(
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index a687a3eafd..2a00acc28f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -401,7 +401,7 @@ public class ControllerRequestURLBuilder {
return url;
}
- public String forInstancePartitions(String tableName, @Nullable InstancePartitionsType instancePartitionsType) {
+ public String forInstancePartitions(String tableName, @Nullable String instancePartitionsType) {
String url = StringUtil.join("/", _baseUrl, "tables", tableName, "instancePartitions");
if (instancePartitionsType != null) {
url += "?type=" + instancePartitionsType;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 4efa7db452..4916668eed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -109,7 +109,7 @@ public class TableConfigBuilder {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
- private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
+ private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
private Map<InstancePartitionsType, String> _instancePartitionsMap;
private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
private List<FieldConfig> _fieldConfigList;
@@ -344,7 +344,7 @@ public class TableConfigBuilder {
}
public TableConfigBuilder setInstanceAssignmentConfigMap(
- Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
return this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org