You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/07 04:40:05 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #9905: Fix race condition when 2 segment upload occurred for the same segment

Jackie-Jiang commented on code in PR #9905:
URL: https://github.com/apache/pinot/pull/9905#discussion_r1041746925


##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java:
##########
@@ -173,6 +173,10 @@ public void testMetadataUploadType()
     Assert.assertTrue(segmentTar.exists());
     checkSegmentZkMetadata(segmentName, 12345L, 123L);
 
+    TestUtils.waitForCondition(

Review Comment:
   This shouldn't be required



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -323,7 +327,7 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment
         // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType,
             e);
-        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+        deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime);
         LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);

Review Comment:
   Let's move this log into `deleteSegmentIfNeeded()` when we choose to delete the segment. Same for other places 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -344,14 +349,37 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment
       // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile.
       newSegmentZKMetadata.setSegmentUploadStartTime(-1);
       if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) {
-        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
-        LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);
-        throw new RuntimeException(
-            String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType));
+        // There is a race condition when it took too much time for the 1st segment upload to process (due to slow
+        // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded.
+        // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to
+        // update the zk metadata. Instead, the 1st attempt should validate the upload start time one more time. If the
+        // start time doesn't match with the one persisted in zk metadata, segment deletion should be skipped.
+        String errorMsg =
+            String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType);
+        LOGGER.error(errorMsg);
+        deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime);
+        throw new RuntimeException(errorMsg);
       }
     }
   }
 
+  /**
+   * Deletes the segment to be uploaded if the uploadStartTime matches with the one persisted in ZK metadata.
+   */
+  private void deleteSegmentIfNeeded(String tableNameWithType, String segmentName, long currentSegmentUploadStartTime) {
+    ZNRecord existingSegmentMetadataZNRecord =
+        _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+    if (existingSegmentMetadataZNRecord == null) {
+      return;
+    }
+    // Check if the upload start time is set by this thread itself, if yes delete the segment.
+    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord);
+    long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime();
+    if (currentSegmentUploadStartTime == existingSegmentUploadStartTime) {

Review Comment:
   When parallel protection is not enabled, we should probably always delete the metadata



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -261,6 +261,9 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
   }
 
   private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) {
+    if (headers == null) {

Review Comment:
   Is this related?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org