You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/02 22:22:24 UTC

[GitHub] [pinot] jackjlli commented on a change in pull request #8110: Extend enableParallePushProtection support in UploadSegment API

jackjlli commented on a change in pull request #8110:
URL: https://github.com/apache/pinot/pull/8110#discussion_r798053687



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
         throw new RuntimeException(e);
       }
     } else {
       LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
+
+    try {
+      _pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName());
+    } catch (Exception e) {
+      // assignTableSegment removes the zk entry. Call deleteSegment to remove the segment from permanent location.
+      LOGGER
+          .error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", segmentName,
+              tableNameWithType, e);
+      _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+      throw new RuntimeException(e);

Review comment:
       The original exception can be re-thrown to the top.

##########
File path: pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
##########
@@ -64,11 +64,34 @@ public void testCompleteSegmentOperations()
     when(segmentMetadata.getCrc()).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
     HttpHeaders httpHeaders = mock(HttpHeaders.class);
+
+    // Test if Zk segment metadata is removed if exception is thrown when moving segment to final location.
+    try {
+      // Create mock finalSegmentLocationURI and currentSegmentLocation.
+      URI finalSegmentLocationURI =
+          URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName()));
+      File currentSegmentLocation = new File(new File("foo/bar"), "mockChild");
+
+      zkOperator
+          .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI,
+              currentSegmentLocation, true, httpHeaders, "downloadUrl",
+              true, "crypter");
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+
+    // Add a tiny sleep to give some time for the segment Zk entry to be deleted.
+    Thread.sleep(5000L);

Review comment:
       We can use waitForCondition method to validate instead of sleeping a fixed amount of time.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);

Review comment:
       Add a log message to show that the new uploaded segment is being deleted.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
         throw new RuntimeException(e);
       }
     } else {
       LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
+
+    try {
+      _pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName());
+    } catch (Exception e) {
+      // assignTableSegment removes the zk entry. Call deleteSegment to remove the segment from permanent location.
+      LOGGER
+          .error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", segmentName,
+              tableNameWithType, e);
+      _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);

Review comment:
       Same here.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
##########
@@ -170,9 +170,26 @@ public void setUp()
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments
+    // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name
+    // and validate correctness with parallel push protection enabled.
     ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
-    uploadSegments(getTableName(), _tarDir);
+    // Create a copy of _tarDir to create multiple segments with the same name.
+    File tarDir2 = new File(_tempDir, "tarDir2");
+    FileUtils.copyDirectory(_tarDir, tarDir2);
+
+    List<File> tarDirPaths = new ArrayList<>();
+    tarDirPaths.add(_tarDir);
+    tarDirPaths.add(tarDir2);
+
+    try {
+      uploadSegments(getTableName(), tarDirPaths, TableType.OFFLINE, true);

Review comment:
       Add `Assert.fail()` right after this line since we should expect the exception to be thrown?

##########
File path: pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
##########
@@ -133,4 +158,4 @@ public void testCompleteSegmentOperations()
   public void tearDown() {
     ControllerTestUtils.cleanup();
   }
-}
+}

Review comment:
       Missing end empty line.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection)
       throws Exception {
+    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
+        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter,
+            enableParallelPushProtection);
+    if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata)) {

Review comment:
       Since it's to process new segment, why not update the Zk metadata with an expected version like `0` here? E.g., if two controllers are trying to upload for the same new segment, then only 1 controller should succeed with the segment.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1778,48 +1778,64 @@ public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetad
 
   public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl,
       @Nullable String crypter) {
-    String segmentName = segmentMetadata.getName();
-    InstancePartitionsType instancePartitionsType;
     // NOTE: must first set the segment ZK metadata before assigning segment to instances because segment assignment
     // might need them to determine the partition of the segment, and server will need them to download the segment
-    ZNRecord znRecord;
+    SegmentZKMetadata segmentZkmetadata =
+        constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter,
+            false);
+    ZNRecord znRecord = segmentZkmetadata.toZNRecord();
+
+    String segmentName = segmentMetadata.getName();
+    String segmentZKMetadataPath =
+        ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName);
+    Preconditions.checkState(_propertyStore.set(segmentZKMetadataPath, znRecord, AccessOption.PERSISTENT),
+        "Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName);
+    LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType);
+
+    assignTableSegment(tableNameWithType, segmentName);
+  }
+
+  public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,

Review comment:
       Add a javadoc for this method.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
##########
@@ -170,9 +170,26 @@ public void setUp()
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Create and upload segments
+    // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name
+    // and validate correctness with parallel push protection enabled.
     ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
-    uploadSegments(getTableName(), _tarDir);
+    // Create a copy of _tarDir to create multiple segments with the same name.
+    File tarDir2 = new File(_tempDir, "tarDir2");
+    FileUtils.copyDirectory(_tarDir, tarDir2);
+
+    List<File> tarDirPaths = new ArrayList<>();

Review comment:
       Could we test it in a test method instead of testing it in the setUp method?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -810,6 +810,33 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmen
     return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**

Review comment:
       You might not need this new API as any new parameter can be passed into `List<NameValuePair> parameters` and then you can leverage the existing API in Line 856.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org