You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/01/17 06:19:04 UTC

[GitHub] [pinot] npawar commented on a change in pull request #7058: Add new partition group metadata at the time of segment commit

npawar commented on a change in pull request #7058:
URL: https://github.com/apache/pinot/pull/7058#discussion_r785662017



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -588,17 +588,46 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
       lock.unlock();
     }
 
-    // TODO: also create the new partition groups here, instead of waiting till the {@link
-    //  RealtimeSegmentValidationManager} runs
+    //  Creates new partition groups here instead of waiting till the {@link RealtimeSegmentValidationManager} runs
     //  E.g. If current state is A, B, C, and newPartitionGroupMetadataList contains B, C, D, E,
-    //  then create metadata/idealstate entries for D, E along with the committing partition's entries.
-    //  Ensure that multiple committing segments don't create multiple new segment metadata and ideal state entries
-    //  for the same partitionGroup
+    //  then metadata/idealstate entries for D, E are created along with the committing partition's entries.
+
+    addNewPartitionGroups(realtimeTableName, tableConfig, instancePartitions, idealState, numReplicas, streamConfig,
+        newPartitionGroupMetadataList, numPartitionGroups, segmentAssignment, instancePartitionsMap);
 
     // Trigger the metadata event notifier
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
   }
 
+  /**
+   * Method is kept synchronised so that multiple committing segments don't create multiple new segment metadata
+   * and ideal state entries for the same partitionGroup
+   */
+  private synchronized void addNewPartitionGroups(String realtimeTableName, TableConfig tableConfig,
+      InstancePartitions instancePartitions, IdealState idealState, int numReplicas,
+      PartitionLevelStreamConfig streamConfig, List<PartitionGroupMetadata> newPartitionGroupMetadataList,
+      int numPartitionGroups, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap =
+        getLatestSegmentZKMetadataMap(realtimeTableName);
+
+    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
+      int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+      if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
+        String newSegmentName =
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, getCurrentTimeMs(),

Review comment:
       there can be a race condition between this and the RealtimeSegmentValidationManager? Check the other invocation of this method `setupNewPartitionGroup`, which happens in the controller periodic background thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org