You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/12/02 22:06:28 UTC
[pinot] 01/01: Fix race condition when 2 segment upload occurred for the same segment
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch fix-segment-upload-race-condition
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit f8f6a9fdfe09686730e2409c20a631dda5256a79
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Dec 2 14:06:07 2022 -0800
Fix race condition when 2 segment upload occurred for the same segment
---
.../pinot/controller/api/upload/ZKOperator.java | 34 ++++++++++++++++++----
1 file changed, 29 insertions(+), 5 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 3d33a358f3..57406dd131 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -261,6 +261,9 @@ public class ZKOperator {
}
private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) {
+ if (headers == null) {
+ return;
+ }
String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
if (expectedCrcStr != null) {
long expectedCrc;
@@ -271,7 +274,7 @@ public class ZKOperator {
String.format("Caught exception for segment: %s of table: %s while parsing IF-MATCH CRC: \"%s\"",
segmentName, tableNameWithType, expectedCrcStr), Response.Status.PRECONDITION_FAILED);
}
- if (expectedCrc != existingCrc) {
+ if (!isCRCMatched(expectedCrc, existingCrc)) {
throw new ControllerApplicationException(LOGGER,
String.format("For segment: %s of table: %s, expected CRC: %d does not match existing CRC: %d", segmentName,
tableNameWithType, expectedCrc, existingCrc), Response.Status.PRECONDITION_FAILED);
@@ -279,6 +282,10 @@ public class ZKOperator {
}
}
+ private boolean isCRCMatched(long expectedCRC, long existingCRC) {
+ return expectedCRC == existingCRC;
+ }
+
private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
@Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
@@ -344,10 +351,27 @@ public class ZKOperator {
// Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile.
newSegmentZKMetadata.setSegmentUploadStartTime(-1);
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) {
- _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
- LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);
- throw new RuntimeException(
- String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType));
+ // There is a race condition when it took too much time for the 1st segment upload to process (due to slow
+ // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded.
+ // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to
+ // update the zk metadata. Instead, the 1st attempt should validate the crc one more time. If crc remains the
+ // same, segment deletion should be skipped.
+ ZNRecord existingSegmentMetadataZNRecord =
+ _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+ // Check if CRC match when IF-MATCH header is set
+ SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord);
+ long existingCrc = segmentZKMetadata.getCrc();
+ try {
+ checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+ LOGGER.info("CRC is the same as the one in ZK. Skip updating the zk metadata for segment: " + segmentName);
+ } catch (ControllerApplicationException e) {
+ LOGGER.error("Failed to validate CRC for segment: " + segmentName, e);
+ _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+ LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType);
+ throw new RuntimeException(
+ String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile,
+ tableNameWithType));
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org