You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/12 21:29:30 UTC

[GitHub] [pinot] jackjlli commented on a diff in pull request #9309: Introduce Segment AssignmentStrategy Interface

jackjlli commented on code in PR #9309:
URL: https://github.com/apache/pinot/pull/9309#discussion_r968946171


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactory.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory for SegmentAssignmentStrategy
+ */
+public class SegmentAssignmentStrategyFactory {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentAssignmentStrategyFactory.class);
+  private SegmentAssignmentStrategyFactory() {
+  }
+
+  /**
+   * Determine Segment Assignment strategy
+   */
+  public static Map<InstancePartitionsType, SegmentAssignmentStrategy>
+  getSegmentAssignmentStrategy(HelixManager helixManager,
+      TableConfig tableConfig, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    String assignmentStrategy = null;
+    Map<InstancePartitionsType, SegmentAssignmentStrategy> segmentAssignmentStrategyMap = new HashMap<>();
+
+    TableType currentTableType = tableConfig.getTableType();
+    // TODO: Handle segment assignment strategy in future for CONSUMING segments in follow up PR
+    InstancePartitionsType instancePartitionsType = currentTableType == TableType.OFFLINE
+        ? InstancePartitionsType.OFFLINE
+        : InstancePartitionsType.COMPLETED;
+
+    // Accommodate new changes for assignment strategy
+    Map<String, SegmentAssignmentConfig>
+        segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
+
+    if (tableConfig.isDimTable()) {
+      // Segment Assignment Strategy for DIM tables
+      SegmentAssignmentStrategy segmentAssignmentStrategy = new DimTableSegmentAssignmentStrategy();
+      segmentAssignmentStrategy.init(helixManager, tableConfig);
+      segmentAssignmentStrategyMap.put(instancePartitionsType, segmentAssignmentStrategy);

Review Comment:
   Should we add a precondition check here to make sure whether the instancePartitionType is `OFFLINE` not `COMPLETED`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java:
##########
@@ -388,4 +395,89 @@ public Map<String, Map<String, String>> getNonTierSegmentAssignment() {
       return _nonTierSegmentAssignment;
     }
   }
+
+  /**
+   * Returns a partition id for offline table
+   */
+  public static int getOfflineSegmentPartitionId(String segmentName, String offlineTableName,
+      HelixManager helixManager, @Nullable String partitionColumn) {
+    SegmentZKMetadata segmentZKMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName, segmentName);
+    Preconditions.checkState(segmentZKMetadata != null,
+        "Failed to find segment ZK metadata for segment: %s of table: %s", segmentName, offlineTableName);
+    return getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn);
+  }
+
+  private static int getPartitionId(SegmentZKMetadata segmentZKMetadata,
+      String offlineTableName, @Nullable String partitionColumn) {

Review Comment:
   Does it have to be offline table?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java:
##########
@@ -388,4 +395,89 @@ public Map<String, Map<String, String>> getNonTierSegmentAssignment() {
       return _nonTierSegmentAssignment;
     }
   }
+
+  /**
+   * Returns a partition id for offline table
+   */
+  public static int getOfflineSegmentPartitionId(String segmentName, String offlineTableName,
+      HelixManager helixManager, @Nullable String partitionColumn) {
+    SegmentZKMetadata segmentZKMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName, segmentName);
+    Preconditions.checkState(segmentZKMetadata != null,
+        "Failed to find segment ZK metadata for segment: %s of table: %s", segmentName, offlineTableName);
+    return getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn);
+  }
+
+  private static int getPartitionId(SegmentZKMetadata segmentZKMetadata,
+      String offlineTableName, @Nullable String partitionColumn) {
+    String segmentName = segmentZKMetadata.getSegmentName();
+    ColumnPartitionMetadata partitionMetadata =
+        segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(partitionColumn);
+    Preconditions.checkState(partitionMetadata != null,
+        "Segment ZK metadata for segment: %s of table: %s does not contain partition metadata for column: %s",
+        segmentName, offlineTableName, partitionColumn);
+    Set<Integer> partitions = partitionMetadata.getPartitions();
+    Preconditions.checkState(partitions.size() == 1,
+        "Segment ZK metadata for segment: %s of table: %s contains multiple partitions for column: %s", segmentName,
+        offlineTableName, partitionColumn);
+    return partitions.iterator().next();
+  }
+
+  /**
+   * Returns map of instance partition id to segments for offline tables
+   */
+  public static Map<Integer, List<String>> getOfflineInstancePartitionIdToSegmentsMap(Set<String> segments,
+      int numInstancePartitions, String offlineTableName,
+      HelixManager helixManager, @Nullable String partitionColumn) {
+    // Fetch partition id from segment ZK metadata
+    List<SegmentZKMetadata> segmentsZKMetadata =
+        ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), offlineTableName);
+
+    Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
+    Set<String> segmentsWithoutZKMetadata = new HashSet<>(segments);
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      if (segmentsWithoutZKMetadata.remove(segmentName)) {
+        int partitionId = getPartitionId(segmentZKMetadata, offlineTableName, partitionColumn);
+        int instancePartitionId = partitionId % numInstancePartitions;
+        instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
+      }
+    }
+    Preconditions.checkState(segmentsWithoutZKMetadata.isEmpty(), "Failed to find ZK metadata for segments: %s",
+        segmentsWithoutZKMetadata);
+
+    return instancePartitionIdToSegmentsMap;
+  }
+
+  /**
+   * Returns a partition id for realtime table
+   */
+  public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
+      HelixManager helixManager, @Nullable String partitionColumn) {
+    Integer segmentPartitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, realtimeTableName, helixManager, partitionColumn);
+    if (segmentPartitionId == null) {
+      // This case is for the uploaded segments for which there's no partition information.
+      // A random, but consistent, partition id is calculated based on the hash code of the segment name.
+      // Note that '% 10K' is used to prevent having partition ids with large value which will be problematic later in
+      // instance assignment formula.
+      segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000);

Review Comment:
   Shouldn't we reuse one of the partition functions like `HashCodePartitionFunction` instead of maintaining one more logic to generate partition Id here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java:
##########
@@ -93,62 +96,22 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
   protected abstract int getReplication(TableConfig tableConfig);
 
   /**
-   * Helper method to check whether the number of replica-groups matches the table replication for replica-group based
-   * instance partitions. Log a warning if they do not match and use the one inside the instance partitions. The
-   * mismatch can happen when table is not configured correctly (table replication and numReplicaGroups does not match
-   * or replication changed without reassigning instances).
+   * Set Segment assignment strategy for different instance partitions and puts into a map of
+   * Map<InstancePartitionsType, SegmentAssignmentStrategy>

Review Comment:
   The javadoc here says that it puts into a map of `Map<InstancePartitionsType, SegmentAssignmentStrategy>`, but the parameter is a map of `Map<InstancePartitionsType, InstancePartitions>`, sth that needs to be updated.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);
+
+  private static HelixManager _helixManager;
+  private static String _tableName;
+  private static String _partitionColumn;
+  private int _replication;
+  private TableConfig _tableConfig;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _helixManager = helixManager;
+    _tableConfig = tableConfig;
+    _tableName = tableConfig.getTableName();
+    _replication = tableConfig.getValidationConfig().getReplicationNumber();
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+        tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();

Review Comment:
   nit: define one local variable to store the validation config since it's been fetched 2 times within a method.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalanceNumSegmentAssignmentStrategy.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Balance num Segment assignment strategy class for offline segment assignment
+ * <ul>
+ *   <li>
+ *     <p>This segment assignment strategy is used when table replication/ num_replica_groups = 1.</p>
+ *   </li>
+ * </ul>
+ */
+public class BalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BalanceNumSegmentAssignmentStrategy.class);
+
+  private String _tableName;
+  private int _replication;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _tableName = tableConfig.getTableName();
+    _replication = tableConfig.getValidationConfig().getReplicationNumber();

Review Comment:
   do we want null check for validation config here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalanceNumSegmentAssignmentStrategy.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Balance num Segment assignment strategy class for offline segment assignment
+ * <ul>
+ *   <li>
+ *     <p>This segment assignment strategy is used when table replication/ num_replica_groups = 1.</p>
+ *   </li>
+ * </ul>
+ */
+public class BalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BalanceNumSegmentAssignmentStrategy.class);
+
+  private String _tableName;
+  private int _replication;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _tableName = tableConfig.getTableName();
+    _replication = tableConfig.getValidationConfig().getReplicationNumber();
+    LOGGER.info("Initialized BalanceNumSegmentAssignmentStrategy for table: "
+            + "{} with replication: {}", _tableName, _replication);
+  }
+
+  @Override
+  public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
+      InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
+    validateSegmentAssignmentStrategy(instancePartitions);
+    return SegmentAssignmentUtils
+        .assignSegmentWithoutReplicaGroup(currentAssignment, instancePartitions, _replication);
+  }
+
+  @Override
+  public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
+      InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
+    validateSegmentAssignmentStrategy(instancePartitions);
+    Map<String, Map<String, String>> newAssignment;
+    List<String> instances =
+        SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication);
+    newAssignment = SegmentAssignmentUtils
+        .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication);
+    return newAssignment;
+  }
+
+  private void validateSegmentAssignmentStrategy(InstancePartitions instancePartitions) {
+    int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+    int numPartitions = instancePartitions.getNumPartitions();
+
+    Preconditions.checkState(numReplicaGroups == 1, "Replica groups should be 1");

Review Comment:
   Could you add some javadoc to the method on why these two configs must be 1? Or modify the error message here to explain more details like `Replica groups should be 1 in order to use BalanceNumSegmentAssignmentStrategy`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalanceNumSegmentAssignmentStrategy.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Balance num Segment assignment strategy class for offline segment assignment
+ * <ul>
+ *   <li>
+ *     <p>This segment assignment strategy is used when table replication/ num_replica_groups = 1.</p>
+ *   </li>
+ * </ul>
+ */
+public class BalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BalanceNumSegmentAssignmentStrategy.class);
+
+  private String _tableName;

Review Comment:
   nit: rename it to `_tableNameWithType` here since the value is fetched from a table config.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -125,8 +133,13 @@ private List<String> assignConsumingSegment(String segmentName, InstancePartitio
       return instancesAssigned;
     } else {
       // Replica-group based assignment
-
-      checkReplication(instancePartitions);
+      // TODO: Refactor check replication this for segment assignment strategy in follow up PR

Review Comment:
   nit: link the OSS issue here. Same for other places with `TODO`.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);
+
+  private static HelixManager _helixManager;
+  private static String _tableName;
+  private static String _partitionColumn;
+  private int _replication;
+  private TableConfig _tableConfig;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    _helixManager = helixManager;
+    _tableConfig = tableConfig;
+    _tableName = tableConfig.getTableName();
+    _replication = tableConfig.getValidationConfig().getReplicationNumber();
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+        tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+    _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
+    if (_partitionColumn == null) {
+      LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
+              + "with replication: {} without partition column for table: {} ", _replication, _tableName);
+    } else {
+      LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
+              + "with replication: {} and partition column: {} for table: {}",
+              _replication, _partitionColumn, _tableName);
+    }
+  }
+
+  /**
+   * Assigns the segment for the replica-group based segment assignment strategy and returns the assigned instances.
+   */
+  @Override
+  public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
+      InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
+    int numPartitions = instancePartitions.getNumPartitions();
+    SegmentAssignmentStrategyUtils.checkReplication(instancePartitions, _replication, _tableName);
+    int partitionId;
+    if (_partitionColumn == null || numPartitions == 1) {
+      partitionId = 0;
+    } else {
+      // Uniformly spray the segment partitions over the instance partitions
+      if (_tableConfig.getTableType() == TableType.OFFLINE) {
+        partitionId =
+            SegmentAssignmentUtils
+                .getOfflineSegmentPartitionId(segmentName, _tableName, _helixManager, _partitionColumn) % numPartitions;

Review Comment:
   Same here, we have a bunch of partition functions to generate the partition Ids already. We should consider reusing them if possible.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java:
##########
@@ -93,62 +96,22 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
   protected abstract int getReplication(TableConfig tableConfig);
 
   /**
-   * Helper method to check whether the number of replica-groups matches the table replication for replica-group based
-   * instance partitions. Log a warning if they do not match and use the one inside the instance partitions. The
-   * mismatch can happen when table is not configured correctly (table replication and numReplicaGroups does not match
-   * or replication changed without reassigning instances).
+   * Set Segment assignment strategy for different instance partitions and puts into a map of
+   * Map<InstancePartitionsType, SegmentAssignmentStrategy>
    */
-  protected void checkReplication(InstancePartitions instancePartitions) {
-    int numReplicaGroups = instancePartitions.getNumReplicaGroups();
-    if (numReplicaGroups != _replication) {
-      _logger.warn(
-          "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for "
-              + "table: {}, using: {}", instancePartitions.getInstancePartitionsName(), numReplicaGroups, _replication,
-          _tableNameWithType, numReplicaGroups);
-    }
-  }
-
-  /**
-   * Helper method to assign instances based on the current assignment and instance partitions.
-   */
-  protected List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
-      InstancePartitions instancePartitions) {
-    int numReplicaGroups = instancePartitions.getNumReplicaGroups();
-    int numPartitions = instancePartitions.getNumPartitions();
-
-    if (numReplicaGroups == 1 && numPartitions == 1) {
-      // Non-replica-group based assignment
-
-      return SegmentAssignmentUtils.assignSegmentWithoutReplicaGroup(currentAssignment, instancePartitions,
-          _replication);
-    } else {
-      // Replica-group based assignment
-
-      checkReplication(instancePartitions);
-
-      int partitionId;
-      if (_partitionColumn == null || numPartitions == 1) {
-        partitionId = 0;
-      } else {
-        // Uniformly spray the segment partitions over the instance partitions
-        partitionId = getSegmentPartitionId(segmentName) % numPartitions;
-      }
-
-      return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
-    }
+  protected void setSegmentAssignmentStrategyMap(
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    _assignmentStrategyMap = SegmentAssignmentStrategyFactory
+        .getSegmentAssignmentStrategy(_helixManager, _tableConfig, instancePartitionsMap);

Review Comment:
   The method name here seems a bit contradict to what we are trying to do here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org