You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/12/02 22:06:28 UTC

[pinot] 01/01: Fix race condition when 2 segment upload occurred for the same segment

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-segment-upload-race-condition
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit f8f6a9fdfe09686730e2409c20a631dda5256a79
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Dec 2 14:06:07 2022 -0800

    Fix race condition when 2 segment upload occurred for the same segment
---
 .../pinot/controller/api/upload/ZKOperator.java    | 34 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 3d33a358f3..57406dd131 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -261,6 +261,9 @@ public class ZKOperator {
   }
 
   private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) {
+    if (headers == null) {
+      return;
+    }
     String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
     if (expectedCrcStr != null) {
       long expectedCrc;
@@ -271,7 +274,7 @@ public class ZKOperator {
             String.format("Caught exception for segment: %s of table: %s while parsing IF-MATCH CRC: \"%s\"",
                 segmentName, tableNameWithType, expectedCrcStr), Response.Status.PRECONDITION_FAILED);
       }
-      if (expectedCrc != existingCrc) {
+      if (!isCRCMatched(expectedCrc, existingCrc)) {
         throw new ControllerApplicationException(LOGGER,
             String.format("For segment: %s of table: %s, expected CRC: %d does not match existing CRC: %d", segmentName,
                 tableNameWithType, expectedCrc, existingCrc), Response.Status.PRECONDITION_FAILED);
@@ -279,6 +282,10 @@ public class ZKOperator {
     }
   }
 
+  private boolean isCRCMatched(long expectedCRC, long existingCRC) {
+    return expectedCRC == existingCRC;
+  }
+
   private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
       @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
       String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
@@ -344,10 +351,27 @@ public class ZKOperator {
       // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile.
       newSegmentZKMetadata.setSegmentUploadStartTime(-1);
       if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) {
-        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
-        LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);
-        throw new RuntimeException(
-            String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType));
+        // There is a race condition when it took too much time for the 1st segment upload to process (due to slow
+        // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded.
+        // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to
+        // update the zk metadata. Instead, the 1st attempt should validate the crc one more time. If crc remains the
+        // same, segment deletion should be skipped.
+        ZNRecord existingSegmentMetadataZNRecord =
+            _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+        // Check if CRC match when IF-MATCH header is set
+        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord);
+        long existingCrc = segmentZKMetadata.getCrc();
+        try {
+          checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+          LOGGER.info("CRC is the same as the one in ZK. Skip updating the zk metadata for segment: " + segmentName);
+        } catch (ControllerApplicationException e) {
+          LOGGER.error("Failed to validate CRC for segment: " + segmentName, e);
+          _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+          LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);
+          throw new RuntimeException(
+              String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile,
+                  tableNameWithType));
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org