You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/11 21:17:24 UTC

[pinot] branch master updated: [feature] Consider tierConfigs when assigning new offline segment (#10746)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c05f6301 [feature] Consider tierConfigs when assigning new offline segment (#10746)
78c05f6301 is described below

commit 78c05f6301e4d9a537de8e537ed2b0236400d353
Author: Alexey Pavlenko <13...@users.noreply.github.com>
AuthorDate: Fri May 12 00:17:18 2023 +0300

    [feature] Consider tierConfigs when assigning new offline segment (#10746)
    
    * Consider tierConfigs when assigning new offline segment
    
    * address remarks
    
    * cleanups
    
    * more nits
    
    * fix test group
    
    * untag servers of cold tenant
    
    ---------
    
    Co-authored-by: Alexey Pavlenko <al...@cloudkitchens.com>
---
 .../pinot/common/utils/config/TierConfigUtils.java |  30 ++++
 .../apache/pinot/controller/ControllerConf.java    |  11 ++
 .../helix/core/PinotHelixResourceManager.java      |  32 +++-
 .../PinotHelixResourceManagerAssignmentTest.java   | 167 +++++++++++++++++++++
 4 files changed, 237 insertions(+), 3 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index 835dddf67a..d6c8eccf60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -24,10 +24,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.tier.FixedTierSegmentSelector;
+import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
@@ -63,6 +67,32 @@ public final class TierConfigUtils {
     return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
   }
 
+  /**
+   * Consider configured tiers and compute default instance partitions for the segment
+   *
+   * @return InstancePartitions if the one can be derived from the given sorted tiers, null otherwise
+   */
+  @Nullable
+  public static InstancePartitions getTieredInstancePartitionsForSegment(String tableNameWithType,
+      String segmentName, @Nullable List<Tier> sortedTiers, HelixManager helixManager) {
+    if (CollectionUtils.isEmpty(sortedTiers)) {
+      return null;
+    }
+
+    // Find first applicable tier
+    for (Tier tier : sortedTiers) {
+      if (tier.getSegmentSelector().selectSegment(tableNameWithType, segmentName)) {
+        // Compute default instance partitions
+        PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage();
+        return InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(helixManager, tableNameWithType,
+            tier.getName(), storage.getServerTag());
+      }
+    }
+
+    // Tier not found
+    return null;
+  }
+
   public static String getDataDirForTier(TableConfig tableConfig, String tierName,
       Map<String, Map<String, String>> instanceTierConfigs) {
     String tableNameWithType = tableConfig.getTableName();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 61141aaa31..d12b40c499 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -76,6 +76,9 @@ public class ControllerConf extends PinotConfiguration {
   public static final String CONTROLLER_RESOURCE_PACKAGES = "controller.restlet.api.resource.packages";
   public static final String DEFAULT_CONTROLLER_RESOURCE_PACKAGES = "org.apache.pinot.controller.api.resources";
 
+  // Consider tierConfigs when assigning new offline segment
+  public static final String CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT = "controller.segment.enableTieredAssignment";
+
   public enum ControllerMode {
     DUAL, PINOT_ONLY, HELIX_ONLY
   }
@@ -671,6 +674,14 @@ public class ControllerConf extends PinotConfiguration {
         Integer.toString(segmentRelocatorFrequencyInSeconds));
   }
 
+  public boolean tieredSegmentAssignmentEnabled() {
+    return getProperty(CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, false);
+  }
+
+  public void setTieredSegmentAssignmentEnabled(boolean enabled) {
+    setProperty(CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, enabled);
+  }
+
   public boolean tenantIsolationEnabled() {
     return getProperty(CLUSTER_TENANT_ISOLATION_ENABLE, true);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5a7e29a829..9142b93792 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -114,6 +114,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
 import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
 import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.BcryptUtils;
 import org.apache.pinot.common.utils.HashUtil;
@@ -217,6 +218,7 @@ public class PinotHelixResourceManager {
   private final boolean _enableBatchMessageMode;
   private final boolean _allowHLCTables;
   private final int _deletedSegmentsRetentionInDays;
+  private final boolean _enableTieredSegmentAssignment;
 
   private HelixManager _helixZkManager;
   private HelixAdmin _helixAdmin;
@@ -230,7 +232,7 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables,
-      int deletedSegmentsRetentionInDays, LineageManager lineageManager) {
+      int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -238,6 +240,7 @@ public class PinotHelixResourceManager {
     _enableBatchMessageMode = enableBatchMessageMode;
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
     _allowHLCTables = allowHLCTables;
+    _enableTieredSegmentAssignment = enableTieredSegmentAssignment;
     _instanceAdminEndpointCache =
         CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS)
             .build(new CacheLoader<String, String>() {
@@ -259,7 +262,7 @@ public class PinotHelixResourceManager {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
         controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays(),
-        LineageManagerFactory.create(controllerConf));
+        controllerConf.tieredSegmentAssignmentEnabled(), LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -2268,10 +2271,32 @@ public class PinotHelixResourceManager {
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
+
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
           fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+      // Initialize tier information only in case direct tier assignment is configured
+      if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+
+        // Update segment tier to support direct assignment for multiple data directories
+        updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+
+        InstancePartitions tierInstancePartitions =
+            TierConfigUtils.getTieredInstancePartitionsForSegment(tableNameWithType, segmentName, sortedTiers,
+                _helixZkManager);
+        if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+          // Override instance partitions for offline table
+          LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}",
+              tierInstancePartitions, segmentName, tableNameWithType);
+          instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions);
+        }
+      }
+
       SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
       synchronized (getTableUpdaterLock(tableNameWithType)) {
+        Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap;
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
@@ -2280,7 +2305,8 @@ public class PinotHelixResourceManager {
                 tableNameWithType);
           } else {
             List<String> assignedInstances =
-                segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
+                segmentAssignment.assignSegment(segmentName, currentAssignment,
+                    finalInstancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances,
                 tableNameWithType);
             currentAssignment.put(segmentName,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java
new file mode 100644
index 0000000000..2c3b92ebc5
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerAssignmentTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
+import org.apache.pinot.spi.config.tenant.Tenant;
+import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(groups = "stateless")
+public class PinotHelixResourceManagerAssignmentTest extends ControllerTest {
+  private static final int NUM_BROKER_INSTANCES = 3;
+  private static final int NUM_OFFLINE_SERVER_INSTANCES = 2;
+  private static final int NUM_OFFLINE_COLD_SERVER_INSTANCES = 2;
+  private static final int NUM_REALTIME_SERVER_INSTANCES = 2;
+  private static final int NUM_SERVER_INSTANCES =
+      NUM_OFFLINE_SERVER_INSTANCES + NUM_REALTIME_SERVER_INSTANCES + NUM_OFFLINE_COLD_SERVER_INSTANCES;
+  private static final String BROKER_TENANT_NAME = "brokerTenant";
+  private static final String SERVER_TENANT_NAME = "serverTenant";
+  private static final String SERVER_COLD_TENANT_NAME = "coldServerTenant";
+
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    properties.put(ControllerConf.CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, true);
+    properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+    startController(properties);
+
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, false);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, false);
+
+    resetBrokerTags();
+    resetServerTags();
+  }
+
+  private void untagBrokers() {
+    for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
+      _helixResourceManager.updateInstanceTags(brokerInstance, Helix.UNTAGGED_BROKER_INSTANCE, false);
+    }
+  }
+
+  private void resetBrokerTags() {
+    untagBrokers();
+    assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), NUM_BROKER_INSTANCES);
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, NUM_BROKER_INSTANCES, 0, 0);
+    _helixResourceManager.createBrokerTenant(brokerTenant);
+    assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), 0);
+  }
+
+  private void untagServers() {
+    for (String serverInstance : _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME)) {
+      _helixResourceManager.updateInstanceTags(serverInstance, Helix.UNTAGGED_SERVER_INSTANCE, false);
+    }
+
+    for (String serverInstance : _helixResourceManager.getAllInstancesForServerTenant(SERVER_COLD_TENANT_NAME)) {
+      _helixResourceManager.updateInstanceTags(serverInstance, Helix.UNTAGGED_SERVER_INSTANCE, false);
+    }
+  }
+
+  private void resetServerTags() {
+    untagServers();
+    assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), NUM_SERVER_INSTANCES);
+
+    // Create default tenant
+    Tenant serverTenant =
+        new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_SERVER_INSTANCES - NUM_OFFLINE_COLD_SERVER_INSTANCES,
+            NUM_OFFLINE_SERVER_INSTANCES, NUM_REALTIME_SERVER_INSTANCES);
+    _helixResourceManager.createServerTenant(serverTenant);
+
+    // Create cold tenant
+    Tenant coldTenant =
+        new Tenant(TenantRole.SERVER, SERVER_COLD_TENANT_NAME, NUM_OFFLINE_COLD_SERVER_INSTANCES,
+            NUM_OFFLINE_COLD_SERVER_INSTANCES, 0);
+    _helixResourceManager.createServerTenant(coldTenant);
+
+    assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0);
+  }
+
+  @Test
+  public void testAssignTargetTier()
+      throws Exception {
+    String coldOfflineServerTag = SERVER_COLD_TENANT_NAME + "_OFFLINE";
+    TierConfig tierConfig =
+        new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, Collections.singletonList("testSegment"),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, coldOfflineServerTag, null, null);
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+            .setTierConfigList(Collections.singletonList(tierConfig)).setServerTenant(SERVER_TENANT_NAME).build();
+    waitForEVToDisappear(tableConfig.getTableName());
+    _helixResourceManager.addTable(tableConfig);
+
+    String segmentName = "testSegment";
+    ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, new SegmentZKMetadata(segmentName));
+    _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, segmentName), "downloadUrl");
+
+    List<SegmentZKMetadata> retrievedSegmentsZKMetadata =
+        _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+    SegmentZKMetadata retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+    assertEquals(retrievedSegmentZKMetadata.getSegmentName(), segmentName);
+    assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+
+    // Retrieve current assignment of the table and ensure segment's presence there
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
+    Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields();
+    assertTrue(currentAssignment.size() == 1 && currentAssignment.get(segmentName).size() == 1);
+
+    // Ensure that the server instance belongs to the cold tenant
+    String coldServerName = currentAssignment.get(segmentName).keySet().iterator().next();
+    InstanceConfig coldServerConfig = HelixHelper.getInstanceConfig(_helixManager, coldServerName);
+    assertTrue(coldServerConfig.containsTag(coldOfflineServerTag));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org