You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/06 00:48:46 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5793: Tiered storage

Jackie-Jiang commented on a change in pull request #5793:
URL: https://github.com/apache/incubator-pinot/pull/5793#discussion_r466068162



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && !tierInstancePartitionsMap.isEmpty()) {

Review comment:
       Use `InstanceAssignmentConfigUtils.shouldRelocateToTiers(tableConfig)` instead of checking map to determine whether to relocate tiers

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
##########
@@ -50,6 +50,13 @@ public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
         .isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
+  /**
+   * Returns whether relocation of segments to tiers has been enabled for this table
+   */
+  public static boolean shouldRelocateToTiers(TableConfig tableConfig) {
+    return tableConfig.getTierConfigsList() != null && !tableConfig.getTierConfigsList().isEmpty();

Review comment:
       (nit)
   ```suggestion
       return CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList());
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
##########
@@ -50,6 +50,13 @@ public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
         .isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
+  /**
+   * Returns whether relocation of segments to tiers has been enabled for this table
+   */
+  public static boolean shouldRelocateToTiers(TableConfig tableConfig) {

Review comment:
       Consider moving this into a separate class `TierConfigUtils`?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TierConfig.java
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Config for the tiered storage and the segments which will move to that tier
+ */
+public class TierConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Name of the tier with format TIER<number>")
+  private final String _name;
+
+  @JsonPropertyDescription("The strategy for selecting segments")
+  private final String _segmentSelectorType;
+
+  @JsonPropertyDescription("For 'timeBased' segment selector, the period after which to select segments for this tier")
+  private final String _segmentAge;
+
+  @JsonPropertyDescription("The type of storage storage")
+  private final String _storageType;
+
+  @JsonPropertyDescription("For 'pinotServer' storageSelector, the tag with which to identify servers for this tier.")
+  private final String _serverTag;
+
+  // TODO: only "serverTag" is supported currently. In next iteration, "InstanceAssignmentConfig _instanceAssignmentConfig" will be added here
+
+  public TierConfig(@JsonProperty(value = "name", required = true) String name,
+      @JsonProperty(value = "segmentSelectorType", required = true) String segmentSelectorType,
+      @JsonProperty("segmentAge") @Nullable String segmentAge,
+      @JsonProperty(value = "storageType", required = true) String storageType,
+      @JsonProperty("serverTag") @Nullable String serverTag) {
+    _name = name;

Review comment:
       `Preconditions.checkArgument()` on all non-null arguments to prevent bad config

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -136,9 +143,55 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC
         }
         // TODO: remove this once we add support for derived columns/chained transform functions
         if (!Collections.disjoint(transformColumns, argumentColumns)) {
-          throw new IllegalStateException("Derived columns not supported yet. Cannot use a transform column as argument to another transform functions");
+          throw new IllegalStateException(
+              "Derived columns not supported yet. Cannot use a transform column as argument to another transform functions");
         }
       }
     }
   }
+
+  /**
+   * Validates the tier configs
+   * Checks for the right segmentSelectorType and its required properties
+   * Checks for the right storageType and its required properties
+   */
+  private static void validateTierConfigList(@Nullable List<TierConfig> tierConfigList) {
+    if (tierConfigList == null) {
+      return;
+    }
+
+    Set<String> tierNames = new HashSet<>();
+    for (TierConfig tierConfig : tierConfigList) {
+      String tierName = tierConfig.getName();
+      Preconditions.checkState(!tierName.isEmpty());
+      Preconditions.checkState(!tierNames.contains(tierName), "Tier name: %s already exists in tier configs", tierName);
+      tierNames.add(tierName);

Review comment:
       (nit)
   ```suggestion
         Preconditions.checkState(tierNames.add(tierName), "Tier name: %s already exists in tier configs", tierName);
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -82,6 +91,10 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
         tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
     _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
 
+    if (InstanceAssignmentConfigUtils.shouldRelocateToTiers(tableConfig)) {

Review comment:
       Move this calculation into the `rebalanceTable()` because we don't need it for `assignSegment()`. Same for `RealtimeSegmentAssignment`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && !tierInstancePartitionsMap.isEmpty()) {
+      LOGGER.info("Rebalancing tiers: {} for table: {} with bootstrap: {}", tierInstancePartitionsMap.keySet(),
+          _offlineTableName, bootstrap);
+
+      // get tier to segment assignment map i.e. current assignments split by tiers they are eligible for
+      SegmentAssignmentUtils.TierSegmentAssignment tierSegmentAssignment =
+          new SegmentAssignmentUtils.TierSegmentAssignment(_offlineTableName, _sortedTiers, currentAssignment);
+      Map<String, Map<String, Map<String, String>>> tierNameToSegmentAssignmentMap =
+          tierSegmentAssignment.getTierNameToSegmentAssignmentMap();
+
+      // for each tier, calculate new assignment using instancePartitions for that tier
+      newTierAssignments = new ArrayList<>(tierNameToSegmentAssignmentMap.size());
+      for (Map.Entry<String, Map<String, Map<String, String>>> entry : tierNameToSegmentAssignmentMap.entrySet()) {
+        String tierName = entry.getKey();
+        Map<String, Map<String, String>> tierCurrentAssignment = entry.getValue();
+
+        InstancePartitions tierInstancePartitions = tierInstancePartitionsMap.get(tierName);
+        Preconditions
+            .checkNotNull(tierInstancePartitions, "Failed to find instance partitions for tier: %s of table: %s",
+                tierName, _offlineTableName);
+        checkReplication(tierInstancePartitions);
+
+        LOGGER.info("Rebalancing tier: {} for table: {} with instance partitions: {}", tierName, _offlineTableName,
+            tierInstancePartitions);
+        newTierAssignments.add(reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap));
+      }
+
+      // rest of the operations should happen only on segments which were not already assigned as part of tiers
+      subsetAssignment = tierSegmentAssignment.getNonTierSegmentAssignment();
+    }
+
     LOGGER.info("Rebalancing table: {} with instance partitions: {}, bootstrap: {}", _offlineTableName,
-        instancePartitions, bootstrap);
-    checkReplication(instancePartitions);
+        offlineInstancePartitions, bootstrap);
+    checkReplication(offlineInstancePartitions);
+    Map<String, Map<String, String>> newAssignment =
+        reassignSegments(InstancePartitionsType.OFFLINE.toString(), subsetAssignment, offlineInstancePartitions,
+            bootstrap);
+
+    // add tier assignments, if available
+    if (CollectionUtils.isNotEmpty(newTierAssignments)) {
+      newTierAssignments.forEach(newAssignment::putAll);
+    }
+
+    LOGGER.info("Rebalanced table: {}, number of segments to be moved to each instance: {}", _offlineTableName,
+        SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment));
+    return newAssignment;
+  }
 
+  /**
+   * Rebalances segments in the current assignment using the instancePartitions and returns new assignment
+   */
+  private Map<String, Map<String, String>> reassignSegments(String instancePartitionType,
+      Map<String, Map<String, String>> currentSegmentAssignment, InstancePartitions instancePartitions,

Review comment:
       (nit) `currentAssignment` for concise?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/tier/TieredStorageRelocator.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.tier;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to run rebalancer in background to relocate segments to storage tiers
+ * TODO: we could potentially get rid of tagOverrideConfig and rely on this relocator for moving COMPLETED segments
+ */
+public class TieredStorageRelocator extends ControllerPeriodicTask<Void> {

Review comment:
       Please keep only one segment relocator. Currently there are 2 relocators: `TieredStorageRelocator` and `RealtimeSegmentRelocator`. They can rebalance the same table at the same time which could cause problem. Recommend replacing `RealtimeSegmentRelocator` with `SegmentRelocator` which handles both completed segment relocation and tier storage relocation.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && !tierInstancePartitionsMap.isEmpty()) {
+      LOGGER.info("Rebalancing tiers: {} for table: {} with bootstrap: {}", tierInstancePartitionsMap.keySet(),
+          _offlineTableName, bootstrap);
+
+      // get tier to segment assignment map i.e. current assignments split by tiers they are eligible for
+      SegmentAssignmentUtils.TierSegmentAssignment tierSegmentAssignment =
+          new SegmentAssignmentUtils.TierSegmentAssignment(_offlineTableName, _sortedTiers, currentAssignment);
+      Map<String, Map<String, Map<String, String>>> tierNameToSegmentAssignmentMap =
+          tierSegmentAssignment.getTierNameToSegmentAssignmentMap();
+
+      // for each tier, calculate new assignment using instancePartitions for that tier
+      newTierAssignments = new ArrayList<>(tierNameToSegmentAssignmentMap.size());
+      for (Map.Entry<String, Map<String, Map<String, String>>> entry : tierNameToSegmentAssignmentMap.entrySet()) {
+        String tierName = entry.getKey();
+        Map<String, Map<String, String>> tierCurrentAssignment = entry.getValue();
+
+        InstancePartitions tierInstancePartitions = tierInstancePartitionsMap.get(tierName);
+        Preconditions
+            .checkNotNull(tierInstancePartitions, "Failed to find instance partitions for tier: %s of table: %s",
+                tierName, _offlineTableName);
+        checkReplication(tierInstancePartitions);
+
+        LOGGER.info("Rebalancing tier: {} for table: {} with instance partitions: {}", tierName, _offlineTableName,

Review comment:
       (nit) Including `bootstrap` in the log?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -91,6 +104,17 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
     }
   }
 
+  /**
+   * Returns a sorted list of Tiers from the TierConfigList in table config.
+   * Keeps only those which have "pinotServer" storage type.
+   */
+  @VisibleForTesting
+  protected List<Tier> getSortedTiersForPinotServerStorage(List<TierConfig> tierConfigList) {
+    return tierConfigList.stream().filter(t -> TierFactory.PINOT_SERVER_STORAGE_TYPE.equals(t.getStorageType()))

Review comment:
       Prefer the old non-lambda way for both performance and readability.
   Also consider moving this common logic into `TierUtils`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;

Review comment:
       Rename to `nonTierAssignment` for clarity?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, Configuration config) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first

Review comment:
       (nit) Capitalize the first letter for convention




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org