You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/05 18:03:25 UTC

[GitHub] [pinot] snleee commented on a change in pull request #7481: MergeRollupTaskGenerator enhancement: enable parallel buckets scheduling

snleee commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r722467149



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());

Review comment:
       Instead of pre-adding the arrayList here, we can check the size of `_selectedSegments` at the beginning of the `add()` call.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -371,17 +471,26 @@ private boolean isMergedSegment(SegmentZKMetadata segmentZKMetadata, String merg
   }
 
   /**
-   * Check if the merge window end time is valid
+   * Get the merge window end time
    */
-  private boolean isValidMergeWindowEndTime(long windowEndMs, long bufferMs, String lowerMergeLevel,
-      MergeRollupTaskMetadata mergeRollupTaskMetadata) {
-    // Check that execution window endTimeMs <= now - bufferTime
-    if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-      return false;
+  private long getMergeWindowEndTime(long windowStartMs, long bucketMs, long numParallelBuckets, long bufferMs,

Review comment:
       One edge case to think of:
   
   Let's say we have the following data available:
   
   8/10, 8/11 (no data), 8/12, 8/13 and parallelism = 3
   
   In this case, our current algo will schedule the window for `8/10,8/11,8/12` and only 2 tasks will be scheduled. If we want to fully utilize the parallelism, optimized algo will schedule `8/10, 8/12, 8/13`. 
   
   I'm fine with not handling but if the fix is relatively simple, optimized version would be slightly better.
   
   
   
   

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {

Review comment:
       Don't we need to track `hasUnmergedSegments` for each bucket?

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;

Review comment:
       If we directly `return false` here, I think that we can remove `_hasSpilledOverData`. At this point, we already added the spilled-over disk to the current bucket's selected segment list. Why do we indicate that the result is sealed in the next round of `add()`?

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -232,32 +243,31 @@ public String getTaskType() {
             long endTimeMs = preSelectedSegment.getEndTimeMs();
             if (endTimeMs >= windowStartMs) {
               // For segments overlapping with current window, add to the result list
-              selectedSegments.add(preSelectedSegment);
-              if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
-                hasUnmergedSegments = true;
+              if (!selectedSegments.add(preSelectedSegment)) {
+                break;
               }
             }
             // endTimeMs < windowStartMs
             // Haven't find the first overlapping segment, continue to the next segment
           } else {
             // Has gone through all overlapping segments for current window
-            if (hasUnmergedSegments) {
+            if (selectedSegments.hasUnmergedSegments()) {
               // Found unmerged segments, schedule merge task for current window
               break;
             } else {
               // No unmerged segments found, clean up selected segments and bump up the merge window
               // TODO: If there are many small merged segments, we should merge them again
               selectedSegments.clear();
-              selectedSegments.add(preSelectedSegment);
-              if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
-                hasUnmergedSegments = true;
-              }
               windowStartMs = startTimeMs / bucketMs * bucketMs;
-              windowEndMs = windowStartMs + bucketMs;
-              if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) {
+              windowEndMs = getMergeWindowEndTime(windowStartMs, bucketMs, numParallelBuckets, bufferMs,
+                  lowerMergeLevel, mergeRollupTaskMetadata);
+              if (!isValidMergeWindow(windowStartMs, windowEndMs)) {
                 isValidMergeWindow = false;
                 break;
               }
+              if (!selectedSegments.add(preSelectedSegment)) {

Review comment:
       Add the comment on why we are breaking (even if we have the comments on `add()`, it will be easier if we have the comment here as well).

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;

Review comment:
       `_selectedSegmentsForBuckets`

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -361,6 +447,20 @@ private boolean validate(TableConfig tableConfig, String taskType) {
     return true;
   }
 
+  /**
+   * Check if two segments' start time belong to the same bucket
+   */
+  private boolean startInSameBucket(long startTimeMs1, long startTimeMs2, long bucketMs) {

Review comment:
       `startInSameBucket` -> `startAtSameBucket` or `belongToSameBucket`

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {
+      return _hasUnmergedSegments;
+    }
+
+    void clear() {
+      _hasSpilledOverData = false;
+      _selectedSegments.clear();
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    List<List<SegmentZKMetadata>> getSelectedSegments() {

Review comment:
       `getSelectedSegmentsForAllBuckets()` may be more clear?

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {
+      return _hasUnmergedSegments;
+    }
+
+    void clear() {
+      _hasSpilledOverData = false;
+      _selectedSegments.clear();
+      _selectedSegments.add(new ArrayList<>());

Review comment:
       Let's remove this and move this to the `add()` call as mentioned before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org