You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/11 21:17:24 UTC
[pinot] branch master updated: [feature] Consider tierConfigs when assigning new offline segment (#10746)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 78c05f6301 [feature] Consider tierConfigs when assigning new offline segment (#10746)
78c05f6301 is described below
commit 78c05f6301e4d9a537de8e537ed2b0236400d353
Author: Alexey Pavlenko <13...@users.noreply.github.com>
AuthorDate: Fri May 12 00:17:18 2023 +0300
[feature] Consider tierConfigs when assigning new offline segment (#10746)
* Consider tierConfigs when assigning new offline segment
* address remarks
* cleanups
* more nits
* fix test group
* untag servers of cold tenant
---------
Co-authored-by: Alexey Pavlenko <al...@cloudkitchens.com>
---
.../pinot/common/utils/config/TierConfigUtils.java | 30 ++++
.../apache/pinot/controller/ControllerConf.java | 11 ++
.../helix/core/PinotHelixResourceManager.java | 32 +++-
.../PinotHelixResourceManagerAssignmentTest.java | 167 +++++++++++++++++++++
4 files changed, 237 insertions(+), 3 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index 835dddf67a..d6c8eccf60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -24,10 +24,14 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.tier.FixedTierSegmentSelector;
+import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
@@ -63,6 +67,32 @@ public final class TierConfigUtils {
return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
}
+ /**
+ * Consider configured tiers and compute default instance partitions for the segment
+ *
+ * @return InstancePartitions if the one can be derived from the given sorted tiers, null otherwise
+ */
+ @Nullable
+ public static InstancePartitions getTieredInstancePartitionsForSegment(String tableNameWithType,
+ String segmentName, @Nullable List<Tier> sortedTiers, HelixManager helixManager) {
+ if (CollectionUtils.isEmpty(sortedTiers)) {
+ return null;
+ }
+
+ // Find first applicable tier
+ for (Tier tier : sortedTiers) {
+ if (tier.getSegmentSelector().selectSegment(tableNameWithType, segmentName)) {
+ // Compute default instance partitions
+ PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage();
+ return InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(helixManager, tableNameWithType,
+ tier.getName(), storage.getServerTag());
+ }
+ }
+
+ // Tier not found
+ return null;
+ }
+
public static String getDataDirForTier(TableConfig tableConfig, String tierName,
Map<String, Map<String, String>> instanceTierConfigs) {
String tableNameWithType = tableConfig.getTableName();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 61141aaa31..d12b40c499 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -76,6 +76,9 @@ public class ControllerConf extends PinotConfiguration {
public static final String CONTROLLER_RESOURCE_PACKAGES = "controller.restlet.api.resource.packages";
public static final String DEFAULT_CONTROLLER_RESOURCE_PACKAGES = "org.apache.pinot.controller.api.resources";
+ // Consider tierConfigs when assigning new offline segment
+ public static final String CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT = "controller.segment.enableTieredAssignment";
+
public enum ControllerMode {
DUAL, PINOT_ONLY, HELIX_ONLY
}
@@ -671,6 +674,14 @@ public class ControllerConf extends PinotConfiguration {
Integer.toString(segmentRelocatorFrequencyInSeconds));
}
+ public boolean tieredSegmentAssignmentEnabled() {
+ return getProperty(CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, false);
+ }
+
+ public void setTieredSegmentAssignmentEnabled(boolean enabled) {
+ setProperty(CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, enabled);
+ }
+
public boolean tenantIsolationEnabled() {
return getProperty(CLUSTER_TENANT_ISOLATION_ENABLE, true);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5a7e29a829..9142b93792 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -114,6 +114,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.BcryptUtils;
import org.apache.pinot.common.utils.HashUtil;
@@ -217,6 +218,7 @@ public class PinotHelixResourceManager {
private final boolean _enableBatchMessageMode;
private final boolean _allowHLCTables;
private final int _deletedSegmentsRetentionInDays;
+ private final boolean _enableTieredSegmentAssignment;
private HelixManager _helixZkManager;
private HelixAdmin _helixAdmin;
@@ -230,7 +232,7 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables,
- int deletedSegmentsRetentionInDays, LineageManager lineageManager) {
+ int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
@@ -238,6 +240,7 @@ public class PinotHelixResourceManager {
_enableBatchMessageMode = enableBatchMessageMode;
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
_allowHLCTables = allowHLCTables;
+ _enableTieredSegmentAssignment = enableTieredSegmentAssignment;
_instanceAdminEndpointCache =
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS)
.build(new CacheLoader<String, String>() {
@@ -259,7 +262,7 @@ public class PinotHelixResourceManager {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays(),
- LineageManagerFactory.create(controllerConf));
+ controllerConf.tieredSegmentAssignmentEnabled(), LineageManagerFactory.create(controllerConf));
}
/**
@@ -2268,10 +2271,32 @@ public class PinotHelixResourceManager {
try {
TableConfig tableConfig = getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
+
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+ // Initialize tier information only in case direct tier assignment is configured
+ if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+
+ // Update segment tier to support direct assignment for multiple data directories
+ updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+
+ InstancePartitions tierInstancePartitions =
+ TierConfigUtils.getTieredInstancePartitionsForSegment(tableNameWithType, segmentName, sortedTiers,
+ _helixZkManager);
+ if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Override instance partitions for offline table
+ LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}",
+ tierInstancePartitions, segmentName, tableNameWithType);
+ instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions);
+ }
+ }
+
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
synchronized (getTableUpdaterLock(tableNameWithType)) {
+ Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap;
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
assert idealState != null;
Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
@@ -2280,7 +2305,8 @@ public class PinotHelixResourceManager {
tableNameWithType);
} else {
List<String> assignedInstances =
- segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
+ finalInstancePartitionsMap);
LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
tableNameWithType);
currentAssignment.put(segmentName,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java
new file mode 100644
index 0000000000..2c3b92ebc5
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
+import org.apache.pinot.spi.config.tenant.Tenant;
+import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(groups = "stateless")
+public class PinotHelixResourceManagerAssignmentTest extends ControllerTest {
+ private static final int NUM_BROKER_INSTANCES = 3;
+ private static final int NUM_OFFLINE_SERVER_INSTANCES = 2;
+ private static final int NUM_OFFLINE_COLD_SERVER_INSTANCES = 2;
+ private static final int NUM_REALTIME_SERVER_INSTANCES = 2;
+ private static final int NUM_SERVER_INSTANCES =
+ NUM_OFFLINE_SERVER_INSTANCES + NUM_REALTIME_SERVER_INSTANCES + NUM_OFFLINE_COLD_SERVER_INSTANCES;
+ private static final String BROKER_TENANT_NAME = "brokerTenant";
+ private static final String SERVER_TENANT_NAME = "serverTenant";
+ private static final String SERVER_COLD_TENANT_NAME = "coldServerTenant";
+
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+ properties.put(ControllerConf.CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, true);
+ properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+ startController(properties);
+
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, false);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, false);
+
+ resetBrokerTags();
+ resetServerTags();
+ }
+
+ private void untagBrokers() {
+ for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
+ _helixResourceManager.updateInstanceTags(brokerInstance, Helix.UNTAGGED_BROKER_INSTANCE, false);
+ }
+ }
+
+ private void resetBrokerTags() {
+ untagBrokers();
+ assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), NUM_BROKER_INSTANCES);
+ Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, NUM_BROKER_INSTANCES, 0, 0);
+ _helixResourceManager.createBrokerTenant(brokerTenant);
+ assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), 0);
+ }
+
+ private void untagServers() {
+ for (String serverInstance : _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME)) {
+ _helixResourceManager.updateInstanceTags(serverInstance, Helix.UNTAGGED_SERVER_INSTANCE, false);
+ }
+
+ for (String serverInstance : _helixResourceManager.getAllInstancesForServerTenant(SERVER_COLD_TENANT_NAME)) {
+ _helixResourceManager.updateInstanceTags(serverInstance, Helix.UNTAGGED_SERVER_INSTANCE, false);
+ }
+ }
+
+ private void resetServerTags() {
+ untagServers();
+ assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), NUM_SERVER_INSTANCES);
+
+ // Create default tenant
+ Tenant serverTenant =
+ new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_SERVER_INSTANCES - NUM_OFFLINE_COLD_SERVER_INSTANCES,
+ NUM_OFFLINE_SERVER_INSTANCES, NUM_REALTIME_SERVER_INSTANCES);
+ _helixResourceManager.createServerTenant(serverTenant);
+
+ // Create cold tenant
+ Tenant coldTenant =
+ new Tenant(TenantRole.SERVER, SERVER_COLD_TENANT_NAME, NUM_OFFLINE_COLD_SERVER_INSTANCES,
+ NUM_OFFLINE_COLD_SERVER_INSTANCES, 0);
+ _helixResourceManager.createServerTenant(coldTenant);
+
+ assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0);
+ }
+
+ @Test
+ public void testAssignTargetTier()
+ throws Exception {
+ String coldOfflineServerTag = SERVER_COLD_TENANT_NAME + "_OFFLINE";
+ TierConfig tierConfig =
+ new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, Collections.singletonList("testSegment"),
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, coldOfflineServerTag, null, null);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+ .setTierConfigList(Collections.singletonList(tierConfig)).setServerTenant(SERVER_TENANT_NAME).build();
+ waitForEVToDisappear(tableConfig.getTableName());
+ _helixResourceManager.addTable(tableConfig);
+
+ String segmentName = "testSegment";
+ ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, new SegmentZKMetadata(segmentName));
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, segmentName), "downloadUrl");
+
+ List<SegmentZKMetadata> retrievedSegmentsZKMetadata =
+ _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+ SegmentZKMetadata retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+ assertEquals(retrievedSegmentZKMetadata.getSegmentName(), segmentName);
+ assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+
+ // Retrieve current assignment of the table and ensure segment's presence there
+ IdealState idealState = HelixHelper.getTableIdealState(_helixManager, OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
+ Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
+ assertTrue(currentAssignment.size() == 1 && currentAssignment.get(segmentName).size() == 1);
+
+ // Ensure that the server instance belongs to the cold tenant
+ String coldServerName = currentAssignment.get(segmentName).keySet().iterator().next();
+ InstanceConfig coldServerConfig = HelixHelper.getInstanceConfig(_helixManager, coldServerName);
+ assertTrue(coldServerConfig.containsTag(coldOfflineServerTag));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org