You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/05/27 00:20:03 UTC
[pinot] branch master updated: 10608: Changes for adding partitionColumn in replicaGroupPartitionConfig (#10656)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 51bf75efa6 10608: Changes for adding partitionColumn in replicaGroupPartitionConfig (#10656)
51bf75efa6 is described below
commit 51bf75efa65cbe8bd8b497eb20e34869205d74e8
Author: Abhishek Sharma <ab...@spothero.com>
AuthorDate: Fri May 26 20:19:57 2023 -0400
10608: Changes for adding partitionColumn in replicaGroupPartitionConfig (#10656)
---
.../assignment/InstanceAssignmentConfigUtils.java | 4 +-
.../common/utils/config/TableConfigUtils.java | 25 ++++++++++
.../common/utils/config/TableConfigSerDeTest.java | 2 +-
.../common/utils/config/TableConfigUtilsTest.java | 25 ++++++++++
.../assignment/segment/BaseSegmentAssignment.java | 6 +--
.../ReplicaGroupSegmentAssignmentStrategy.java | 6 +--
...anceAssignmentRestletResourceStatelessTest.java | 6 +--
.../instance/InstanceAssignmentTest.java | 58 ++++++++++++----------
.../TableRebalancerClusterStatelessTest.java | 4 +-
.../segment/local/utils/TableConfigUtils.java | 20 ++++++++
.../segment/local/utils/TableConfigUtilsTest.java | 42 ++++++++++++++++
.../InstanceReplicaGroupPartitionConfig.java | 14 +++++-
12 files changed, 169 insertions(+), 43 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index b571918c0c..b37429c527 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -114,12 +114,12 @@ public class InstanceAssignmentConfigUtils {
Preconditions.checkState(numPartitions > 0, "Number of partitions for column: %s is not properly configured",
partitionColumn);
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, numPartitions,
- replicaGroupStrategyConfig.getNumInstancesPerPartition(), minimizeDataMovement);
+ replicaGroupStrategyConfig.getNumInstancesPerPartition(), minimizeDataMovement, partitionColumn);
} else {
// If partition column is not configured, use replicaGroupStrategyConfig.getNumInstancesPerPartition() as
// number of instances per replica-group for backward-compatibility
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
- replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement);
+ replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
}
return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 0546dcb66a..0a59696b10 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.DimensionTableConfig;
@@ -38,6 +39,7 @@ import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -444,4 +446,27 @@ public class TableConfigUtils {
return hasPreConfiguredInstancePartitions(tableConfig)
&& tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType);
}
+
+ /**
+ * Get the partition column from tableConfig instance assignment config map.
+ * @param tableConfig table config
+ * @return partition column
+ */
+ public static String getPartitionColumn(TableConfig tableConfig) {
+ // check InstanceAssignmentConfigMap is null or empty,
+ if (!MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+ for (InstanceAssignmentConfig instanceAssignmentConfig : tableConfig.getInstanceAssignmentConfigMap().values()) {
+ //check InstanceAssignmentConfig has the InstanceReplicaGroupPartitionConfig with non-empty partitionColumn
+ if (StringUtils.isNotEmpty(instanceAssignmentConfig.getReplicaGroupPartitionConfig().getPartitionColumn())) {
+ return instanceAssignmentConfig.getReplicaGroupPartitionConfig().getPartitionColumn();
+ }
+ }
+ }
+
+ // for backward-compatibility, If partitionColumn value isn't there in InstanceReplicaGroupPartitionConfig
+ // check ReplicaGroupStrategyConfig for partitionColumn
+ ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+ tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+ return replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 6436058b59..eb1d31bab0 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -211,7 +211,7 @@ public class TableConfigSerDeTest {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
- new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false));
+ new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
index 3e00e9fdeb..9d1f86dc23 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -48,6 +49,7 @@ import org.testng.annotations.Test;
public class TableConfigUtilsTest {
private static final String TABLE_NAME = "testTable";
+ private static final String PARTITION_COLUMN = "partitionColumn";
/**
* Test the {@link TableConfigUtils#convertFromLegacyTableConfig(TableConfig)} utility.
@@ -140,6 +142,29 @@ public class TableConfigUtilsTest {
Assert.assertEquals(tierTblCfg, tableConfig);
}
+ @Test
+ public void testGetPartitionColumnWithoutAnyConfig() {
+ // without instanceAssignmentConfigMap
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+ Assert.assertNull(TableConfigUtils.getPartitionColumn(tableConfig));
+ }
+
+ @Test
+ public void testGetPartitionColumnWithReplicaGroupConfig() {
+ ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+ new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+
+ // setting up ReplicaGroupStrategyConfig for backward compatibility test.
+ SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
+ validationConfig.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
+ tableConfig.setValidationConfig(validationConfig);
+
+ Assert.assertEquals(PARTITION_COLUMN, TableConfigUtils.getPartitionColumn(tableConfig));
+ }
+
/**
* Helper method to create a test StreamConfigs map.
* @return Map containing Stream Configs
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index fc66bce53e..b89fd497be 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -28,9 +28,9 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -76,9 +76,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
_tableNameWithType = tableConfig.getTableName();
_tableConfig = tableConfig;
_replication = tableConfig.getReplication();
- ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
- tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
- _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
+ _partitionColumn = TableConfigUtils.getPartitionColumn(_tableConfig);
if (_partitionColumn == null) {
_logger.info("Initialized with replication: {} without partition column for table: {} ", _replication,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
index d5a4d0e027..3ff0807220 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
@@ -27,8 +27,8 @@ import java.util.Random;
import java.util.TreeMap;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -54,9 +54,7 @@ class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
_replication = tableConfig.getReplication();
- ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
- validationAndRetentionConfig.getReplicaGroupStrategyConfig();
- _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
+ _partitionColumn = TableConfigUtils.getPartitionColumn(_tableConfig);
if (_partitionColumn == null) {
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
+ "with replication: {} without partition column for table: {} ", _replication, _tableName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index ef95da5135..570c59438c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Add OFFLINE instance assignment config to the offline table config
InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
offlineTableConfig.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Add CONSUMING instance assignment config to the real-time table config
InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
realtimeTableConfig.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
null)));
InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 53bd2da317..4335d80b14 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -350,7 +350,7 @@ public class InstanceAssignmentTest {
// Assign to 2 replica-groups so that each replica-group is assigned to one pool
int numReplicaGroups = numPools;
InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
@@ -448,7 +448,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = numPools;
- replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false);
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
@@ -477,7 +477,7 @@ public class InstanceAssignmentTest {
// Reset the number of replica groups to 2 and pools to 2.
numReplicaGroups = 2;
numPools = 2;
- replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
@@ -554,7 +554,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = 3;
- replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
@@ -633,7 +633,7 @@ public class InstanceAssignmentTest {
// Reduce number of replica groups from 3 to 2.
numReplicaGroups = 2;
- replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true);
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
@@ -760,7 +760,7 @@ public class InstanceAssignmentTest {
InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -849,7 +849,8 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null
+ );
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -862,7 +863,8 @@ public class InstanceAssignmentTest {
}
// Enable replica-group
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null
+ );
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -874,7 +876,7 @@ public class InstanceAssignmentTest {
assertEquals(e.getMessage(), "Number of replica-groups must be positive");
}
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -887,7 +889,7 @@ public class InstanceAssignmentTest {
"Not enough qualified instances from pool: 0, cannot select 6 replica-groups from 5 instances");
}
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -899,7 +901,7 @@ public class InstanceAssignmentTest {
assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
}
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -912,7 +914,7 @@ public class InstanceAssignmentTest {
"Number of instances per partition: 3 must be smaller or equal to number of instances per replica-group: 2");
}
- replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false);
+ replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
@@ -948,7 +950,8 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0,
+ 0, false, null);
try {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
@@ -977,7 +980,8 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
replicaPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup,
+ 0, 0, false, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
@@ -1009,7 +1013,8 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
replicaPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
+ numInstancesPerReplicaGroup, 0, 0, false, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
@@ -1049,7 +1054,8 @@ public class InstanceAssignmentTest {
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
replicaPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
+ numInstancesPerReplicaGroup, 0, 0, false, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
@@ -1087,7 +1093,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 7 instances
InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
@@ -1159,7 +1165,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 7 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, true);
+ numInstancesPerPartition, true, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
@@ -1237,7 +1243,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 7 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
String partitionColumnName = "partition";
SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(
Collections.singletonMap(partitionColumnName, new ColumnPartitionConfig("Modulo", numPartitionsSegment, null)));
@@ -1310,7 +1316,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 3 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
// Do not rotate for testing
InstanceConstraintConfig instanceConstraintConfig =
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
@@ -1367,7 +1373,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 3 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, true);
+ numInstancesPerPartition, true, null);
// Do not rotate for testing
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
@@ -1433,7 +1439,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 5 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
// Do not rotate instance sequence in pool (for testing)
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
// Do not rotate pool sequence (for testing)
@@ -1499,7 +1505,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 5 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, true);
+ numInstancesPerPartition, true, null);
// Do not rotate instance sequence in pool (for testing)
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
// Do not rotate pool sequence (for testing)
@@ -1571,7 +1577,7 @@ public class InstanceAssignmentTest {
// Assign to 3 replica-groups so that each replica-group is assigned 1 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
// Do not rotate for testing
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
@@ -1621,7 +1627,7 @@ public class InstanceAssignmentTest {
// Assign to 6 replica-groups so that each replica-group is assigned 2 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, false);
+ numInstancesPerPartition, false, null);
// Do not rotate instance sequence in pool (for testing)
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
// Do not rotate pool sequence (for testing)
@@ -1685,7 +1691,7 @@ public class InstanceAssignmentTest {
// Assign to 6 replica-groups so that each replica-group is assigned 2 instances
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions,
- numInstancesPerPartition, true);
+ numInstancesPerPartition, true, null);
// Do not rotate instance sequence in pool (for testing)
instanceConstraintConfig = new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
// Do not rotate pool sequence (for testing)
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 4b9e869dcf..91f27160c0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
InstanceTagPoolConfig tagPoolConfig =
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
_helixResourceManager.updateTableConfig(tableConfig);
@@ -477,7 +477,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME), false, 0,
null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
- new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false);
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
_helixResourceManager.updateTableConfig(tableConfig);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index c679759b87..9ad82bc23c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
@@ -141,6 +142,7 @@ public final class TableConfigUtils {
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(), tableConfig.getIndexingConfig(), schema);
validateInstancePartitionsTypeMapConfig(tableConfig);
+ validatePartitionedReplicaGroupInstance(tableConfig);
if (!skipTypes.contains(ValidationType.UPSERT)) {
validateUpsertAndDedupConfig(tableConfig, schema);
validatePartialUpsertStrategies(tableConfig, schema);
@@ -605,6 +607,24 @@ public final class TableConfigUtils {
}
}
+ /**
+ * Detects whether both replicaGroupStrategyConfig and replicaGroupPartitionConfig are set for a given
+ * table. Validation fails because the table would ignore replicaGroupStrategyConfig
+ * when the replicaGroupPartitionConfig is already set.
+ */
+ @VisibleForTesting
+ static void validatePartitionedReplicaGroupInstance(TableConfig tableConfig) {
+ if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null
+ || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+ return;
+ }
+ for (Map.Entry<String, InstanceAssignmentConfig> entry: tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
+ boolean isNullReplicaGroupPartitionConfig = entry.getValue().getReplicaGroupPartitionConfig() == null;
+ Preconditions.checkState(isNullReplicaGroupPartitionConfig,
+ "Both replicaGroupStrategyConfig and replicaGroupPartitionConfig is provided");
+ }
+ }
+
/**
* Validates metrics aggregation when upsert config is enabled
* - Metrics aggregation cannot be enabled when Upsert Config is enabled.
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 963e23438e..138ae2db1a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -42,6 +43,7 @@ import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
@@ -1749,6 +1751,46 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void testValidatePartitionedReplicaGroupInstance() {
+ String partitionColumn = "testPartitionCol";
+ ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+ new ReplicaGroupStrategyConfig(partitionColumn, 2);
+
+ TableConfig tableConfigWithoutReplicaGroupStrategyConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .build();
+ // Call validate with a table-config without replicaGroupStrategyConfig or replicaGroupPartitionConfig.
+ TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithoutReplicaGroupStrategyConfig);
+
+ TableConfig tableConfigWithReplicaGroupStrategyConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ tableConfigWithReplicaGroupStrategyConfig.getValidationConfig()
+ .setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
+
+ // Call validate with a table-config with replicaGroupStrategyConfig and without replicaGroupPartitionConfig.
+ TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithReplicaGroupStrategyConfig);
+
+ InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 2, 0, false, partitionColumn);
+ Mockito.doReturn(instanceReplicaGroupPartitionConfig)
+ .when(instanceAssignmentConfig).getReplicaGroupPartitionConfig();
+
+ TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(TABLE_NAME).setInstanceAssignmentConfigMap(
+ ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)).build();
+ invalidTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
+
+ try {
+ // Call validate with a table-config with replicaGroupStrategyConfig and replicaGroupPartitionConfig.
+ TableConfigUtils.validatePartitionedReplicaGroupInstance(invalidTableConfig);
+ Assert.fail("Validation should have failed since both replicaGroupStrategyConfig "
+ + "and replicaGroupPartitionConfig are set");
+ } catch (IllegalStateException ignored) {
+ }
+ }
+
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
index 95102e77c4..adc72e8f1c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.config.table.assignment;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;
@@ -51,6 +52,10 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
+ "instances if not " + "specified")
private final int _numInstancesPerPartition;
+ @JsonPropertyDescription(
+ "Name of the column used for partition, if not provided table level replica group will be used")
+ private final String _partitionColumn;
+
private final boolean _minimizeDataMovement;
@JsonCreator
@@ -59,7 +64,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
@JsonProperty("numInstancesPerReplicaGroup") int numInstancesPerReplicaGroup,
@JsonProperty("numPartitions") int numPartitions,
@JsonProperty("numInstancesPerPartition") int numInstancesPerPartition,
- @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) {
+ @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement,
+ @Nullable @JsonProperty("partitionColumn") String partitionColumn) {
_replicaGroupBased = replicaGroupBased;
_numInstances = numInstances;
_numReplicaGroups = numReplicaGroups;
@@ -67,6 +73,7 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
_numPartitions = numPartitions;
_numInstancesPerPartition = numInstancesPerPartition;
_minimizeDataMovement = minimizeDataMovement;
+ _partitionColumn = partitionColumn;
}
public boolean isReplicaGroupBased() {
@@ -96,4 +103,9 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
public boolean isMinimizeDataMovement() {
return _minimizeDataMovement;
}
+
+ @Nullable
+ public String getPartitionColumn() {
+ return _partitionColumn;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org