You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/09/15 21:17:54 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7427: Fix race conditions between segment merge/roll-up and purge (or convertToRawIndex) tasks

Jackie-Jiang commented on a change in pull request #7427:
URL: https://github.com/apache/pinot/pull/7427#discussion_r709567468



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -85,6 +85,7 @@
 
   public static class CustomHeaders {
     public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
+    public static final String REPLACE_ONLY = "REPLACE_ONLY";

Review comment:
       Suggest renaming it to `REFRESH_ONLY` which is the term used currently in Pinot

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -156,7 +167,10 @@ private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSe
         // (creation time is not included in the crc)
         existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
         existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata)) {
+        // NOTE: in rare cases the segment can be deleted before the metadata is updated, we should fail the request for

Review comment:
       Let's just fail the upload when the version does not match regardless of whether the flag is set. This is consistent with the behavior of locking the segment, and more robust. During the locking period, if the metadata changed, just abort the upload.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -58,18 +58,25 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle
     _controllerMetrics = controllerMetrics;
   }
 
-  public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
+  public boolean completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter)
       throws Exception {
     String segmentName = segmentMetadata.getName();
     ZNRecord segmentMetadataZNRecord =
         _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+    boolean replaceOnly =
+        "true".equalsIgnoreCase(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REPLACE_ONLY));

Review comment:
       ```suggestion
           Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REPLACE_ONLY));
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -387,15 +391,15 @@ private SegmentMetadata getSegmentMetadata(File tempDecryptedFile, File tempSegm
     return MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile, tempSegmentDir);
   }
 
-  private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile,
-      String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation, String crypter)
+  private boolean completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers,

Review comment:
       +1

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -81,18 +88,20 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
 
     LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType);
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,
+        enableParallelPushProtection, replaceOnly, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,

Review comment:
       Why do we need to pass in the boolean here? This method always processes existing segment

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -175,10 +189,12 @@ private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSe
         }
 
         _pinotHelixResourceManager
-            .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, zkDownloadURI, crypter);
+            .refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata,
+                replaceOnly ? expectedVersion : -1, zkDownloadURI, crypter);

Review comment:
       Same here, always check expected version regardless of the flag




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org