You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/02 23:38:18 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8110: Extend enableParallePushProtection support in UploadSegment API

Jackie-Jiang commented on a change in pull request #8110:
URL: https://github.com/apache/pinot/pull/8110#discussion_r798078236



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -255,6 +275,21 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
       segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
       if (!_pinotHelixResourceManager
           .updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZnRecord.getVersion())) {
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+        throw new RuntimeException(
+            "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
+      }
+    }
+
+    if (enableParallelPushProtection) {
+      // Release lock.
+      ZNRecord segmentMetadataZnRecord =

Review comment:
       We should not read a new ZNRecord because this might not be the original one. We should reuse the `newSegmentZKMetadata`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
       throws Exception {
+    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
+        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter,
+            enableParallelPushProtection);

Review comment:
       Suggest not passing in `enableParallelPushProtection` but check it in this method and call `setSegmentUploadStartTime()` within this method to limit the scope of parallel push protection

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
       throws Exception {
+    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
+        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter,
+            enableParallelPushProtection);
+    if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata)) {

Review comment:
       Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);

Review comment:
       Can we add some tests to ensure this can properly clean up the ZK entry and the segment file?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
         throw new RuntimeException(e);
       }
     } else {
       LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
+
+    try {
+      _pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName());
+    } catch (Exception e) {
+      // assignTableSegment removes the zk entry. Call deleteSegment to remove the segment from permanent location.
+      LOGGER
+          .error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", segmentName,
+              tableNameWithType, e);
+      _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+      throw new RuntimeException(e);
+    }
 
     // Update zk metadata customer map
     String segmentZKMetadataCustomMapModifierStr = headers != null ? headers

Review comment:
       This modification should be applied when creating the initial metadata




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org