You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "klsince (via GitHub)" <gi...@apache.org> on 2023/02/27 22:09:39 UTC

[GitHub] [pinot] klsince commented on a diff in pull request #10255: Allow replica group assignment support in tier configs

klsince commented on code in PR #10255:
URL: https://github.com/apache/pinot/pull/10255#discussion_r1119314780


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java:
##########
@@ -80,39 +83,68 @@ public class PinotInstanceAssignmentRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/{tableName}/instancePartitions")
   @ApiOperation(value = "Get the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+  public Map<String, InstancePartitions> getInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
-          InstancePartitionsType instancePartitionsType) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|Name of the tier") @QueryParam("type") @Nullable String type) {
+    Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
 
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType != TableType.REALTIME) {
-      if (instancePartitionsType == InstancePartitionsType.OFFLINE || instancePartitionsType == null) {
-        InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null) {
+        InstancePartitions offlineInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
         if (offlineInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.OFFLINE, offlineInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstancePartitions);
         }
       }
     }
     if (tableType != TableType.OFFLINE) {
-      if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
-        InstancePartitions consumingInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) {
+        InstancePartitions consumingInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
         if (consumingInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), consumingInstancePartitions);
         }
       }
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
-        InstancePartitions completedInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) {
+        InstancePartitions completedInstancePartitions =
+            InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
         if (completedInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+        }
+      }
+    }
+
+    TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
+    if (realtimeTableConfig != null && CollectionUtils.isNotEmpty(realtimeTableConfig.getTierConfigsList())) {
+      for (TierConfig tierConfig : realtimeTableConfig.getTierConfigsList()) {
+        if (type == null || type.equals(tierConfig.getName())) {
+          InstancePartitions instancePartitions =
+              InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                  InstancePartitionsUtils.getInstancePartitonNameForTier(realtimeTableConfig.getTableName(),
+                      tierConfig.getName()));
+          if (instancePartitions != null) {
+            instancePartitionsMap.put(tierConfig.getName(), instancePartitions);
+          }
+        }
+      }
+    }
+
+    TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName);
+    if (offlineTableConfig != null && CollectionUtils.isNotEmpty(offlineTableConfig.getTierConfigsList())) {
+      for (TierConfig tierConfig : offlineTableConfig.getTierConfigsList()) {
+        if (type == null || type.equals(tierConfig.getName())) {
+          InstancePartitions instancePartitions =
+              InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                  InstancePartitionsUtils.getInstancePartitonNameForTier(realtimeTableConfig.getTableName(),

Review Comment:
   offlineTableConfig.getTableName()?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1757,6 +1758,23 @@ private void assignInstances(TableConfig tableConfig, boolean override) {
         }
       }
     }
+
+    // Process and persist tier config instancePartitions
+    if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+        && tableConfig.getInstanceAssignmentConfigMap() != null) {
+      for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+        if (tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName())) {
+          if (override || InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+              InstancePartitionsUtils.getInstancePartitonNameForTier(tableNameWithType, tierConfig.getName()))
+              == null) {
+            InstancePartitions instancePartitions =
+                instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null,
+                    tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);

Review Comment:
   leave some INFO logs as done in the for-loop above. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -545,10 +547,54 @@ private Map<String, InstancePartitions> getTierToInstancePartitionsMap(String ta
    *  InstancePartitions to zk.
    *  Then we'll be able to support replica group assignment while creating InstancePartitions for tiers
    */
-  private InstancePartitions getInstancePartitionsForTier(Tier tier, String tableNameWithType) {

Review Comment:
   update the method comment a bit?
   
   also I feel it probably better to keep this method functional, w/o persisting states into ZK as a side effect, which may be put in separate util methods and get called by TableRebalancer based on dryRun/bootstrap flag.



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -261,6 +263,20 @@ public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertySt
     }
   }
 
+  @Nullable
+  public static List<InstancePartitions> getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+    List<ZNRecord> znRecordss =
+        propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, null, AccessOption.PERSISTENT);
+
+    try {
+      return Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting user list configuration", e);

Review Comment:
   looks like need to adjust the error msg for this method



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -82,4 +83,34 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
   }
+
+  // TODO (saurabh) : Move commons

Review Comment:
   curious is there some complexity to extract the common methods in this PR? I might have missed it. maybe call it out in the TODO comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org