You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/11/29 21:59:53 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7828: Fix thread safety issue and add cache to EmptySegmentPruner

Jackie-Jiang commented on a change in pull request #7828:
URL: https://github.com/apache/pinot/pull/7828#discussion_r758768399



##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
##########
@@ -69,66 +68,99 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
       segments.add(segment);
       segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
     }
+    _segmentsLoaded.addAll(segments);
+    Set<String> emptySegments = new HashSet<>();
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment, znRecords.get(i));
-      _segmentTotalDocsMap.put(segment, totalDocs);
-      if (totalDocs == 0) {
-        _emptySegments.add(segment);
+      if (isEmpty(segment, znRecords.get(i))) {
+        emptySegments.add(segment);
       }
     }
-  }
-
-  private long extractTotalDocsFromSegmentZKMetaZNRecord(String segment, @Nullable ZNRecord znRecord) {
-    if (znRecord == null) {
-      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType);
-      return -1;
-    }
-    return znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+    _emptySegments = emptySegments;
   }
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
       Set<String> onlineSegments) {
     // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
+    boolean emptySegmentsChanged = false;
+    Set<String> emptySegments = new HashSet<>(_emptySegments);
     for (String segment : onlineSegments) {
-      _segmentTotalDocsMap.computeIfAbsent(segment, k -> {
-        long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(k,
-            _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT));
-        if (totalDocs == 0) {
-          _emptySegments.add(segment);
+      if (_segmentsLoaded.add(segment)) {
+        if (isEmpty(segment,
+            _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))) {
+          emptySegmentsChanged |= emptySegments.add(segment);
         }
-        return totalDocs;
-      });
+      }
+    }
+    _segmentsLoaded.retainAll(onlineSegments);
+    emptySegmentsChanged |= emptySegments.retainAll(onlineSegments);
+
+    if (emptySegmentsChanged) {
+      _emptySegments = emptySegments;
+      // Reset the result cache when empty segments changed
+      _resultCache = null;
     }
-    _segmentTotalDocsMap.keySet().retainAll(onlineSegments);
-    _emptySegments.retainAll(onlineSegments);
   }
 
   @Override
   public synchronized void refreshSegment(String segment) {
-    long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT));
-    _segmentTotalDocsMap.put(segment, totalDocs);
-    if (totalDocs == 0) {
-      _emptySegments.add(segment);
+    _segmentsLoaded.add(segment);
+    if (isEmpty(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))) {
+      if (!_emptySegments.contains(segment)) {
+        Set<String> emptySegments = new HashSet<>(_emptySegments);
+        emptySegments.add(segment);
+        _emptySegments = emptySegments;
+        // Reset the result cache when empty segments changed
+        _resultCache = null;
+      }
     } else {
-      _emptySegments.remove(segment);
+      if (_emptySegments.contains(segment)) {
+        Set<String> emptySegments = new HashSet<>(_emptySegments);
+        emptySegments.remove(segment);
+        _emptySegments = emptySegments;
+        // Reset the result cache when empty segments changed
+        _resultCache = null;
+      }
     }
   }
 
-  /**
-   * Prune out segments which are empty
-   */
+  private boolean isEmpty(String segment, @Nullable ZNRecord segmentZKMetadataZNRecord) {
+    if (segmentZKMetadataZNRecord == null) {
+      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType);
+      return false;
+    }
+    return segmentZKMetadataZNRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1) == 0;
+  }
+
   @Override
   public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
-    if (_emptySegments.isEmpty()) {
+    Set<String> emptySegments = _emptySegments;
+    if (emptySegments.isEmpty()) {
       return segments;
     }
+
+    // Return the cached result when the input is the same reference
+    ResultCache resultCache = _resultCache;
+    if (resultCache != null && resultCache._inputSegments == segments) {

Review comment:
       We don't want to compare the actual content because it can be expensive




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org