You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/02/07 19:18:31 UTC

[pinot] branch master updated: Add allowRefresh option to UploadSegment (#8125)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac0daf  Add allowRefresh option to UploadSegment (#8125)
5ac0daf is described below

commit 5ac0daf06d3bf6ebedd45cd2d315b46831caa777
Author: Vivek Iyer Vaidyanathan Iyer <vv...@gmail.com>
AuthorDate: Tue Feb 8 00:48:14 2022 +0530

    Add allowRefresh option to UploadSegment (#8125)
    
    * Add overwriteIfExists option to UploadSegment
    
    Currently, when a segment is uploaded, we always overwrite if a
    segment with the same name already exists. Having an overwrite
    parameter in the UploadSegment API will give clients finer
    control during uploading segments.
    
    Note that the default option is set to true to retain existing
    behavior.
    
    * Address minor review comments from PR #8110
    
    - Improve description for a method.
    - Add message when exception is thrown.
    
    * Rename overwriteIfExists to allowRefresh
    
    Co-authored-by: Vivek Iyer Vaidyanathan <vv...@vvaidyan-mn1.linkedin.biz>
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |  1 +
 .../common/utils/FileUploadDownloadClient.java     | 13 ++++---
 .../PinotSegmentUploadDownloadRestletResource.java | 41 +++++++++++++---------
 .../pinot/controller/api/upload/ZKOperator.java    | 14 +++++++-
 .../helix/core/PinotHelixResourceManager.java      | 12 +++----
 .../controller/api/upload/ZKOperatorTest.java      | 19 +++++++---
 .../pinot/integration/tests/ClusterTest.java       |  4 +--
 7 files changed, 70 insertions(+), 34 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 095c0f4..501a72c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -157,6 +157,7 @@ public class ZKMetadataProvider {
           .create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
               segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
     } catch (Exception e) {
+      LOGGER.error("Caught exception while creating segmentZkMetadata for table: {}", tableNameWithType, e);
       return false;
     }
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 470129c..0cb747c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -98,6 +98,7 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   public static class QueryParameters {
+    public static final String ALLOW_REFRESH = "allowRefresh";
     public static final String ENABLE_PARALLEL_PUSH_PROTECTION = "enableParallelPushProtection";
     public static final String TABLE_NAME = "tableName";
     public static final String TABLE_TYPE = "tableType";
@@ -811,7 +812,8 @@ public class FileUploadDownloadClient implements Closeable {
   }
 
   /**
-   * Upload segment with segment file using  table name, type and enableParallelPushProtection as a request parameters.
+   * Upload segment with segment file using  table name, type, enableParallelPushProtection and allowRefresh as
+   * request parameters.
    *
    * @param uri URI
    * @param segmentName Segment name
@@ -819,21 +821,24 @@ public class FileUploadDownloadClient implements Closeable {
    * @param tableName Table name with or without type suffix
    * @param tableType Table type
    * @param enableParallelPushProtection enable protection against concurrent segment uploads for the same segment
+   * @param allowRefresh whether to refresh a segment if it already exists
    * @return Response
    * @throws IOException
    * @throws HttpErrorStatusException
    */
   public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String tableName,
-      TableType tableType, boolean enableParallelPushProtection)
+      TableType tableType, boolean enableParallelPushProtection, boolean allowRefresh)
       throws IOException, HttpErrorStatusException {
     NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
     NameValuePair tableTypeValuePair = new BasicNameValuePair(QueryParameters.TABLE_TYPE, tableType.name());
     NameValuePair enableParallelPushProtectionValuePair =
         new BasicNameValuePair(QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
             String.valueOf(enableParallelPushProtection));
+    NameValuePair allowRefreshValuePair =
+        new BasicNameValuePair(QueryParameters.ALLOW_REFRESH, String.valueOf(allowRefresh));
 
-    List<NameValuePair> parameters =
-        Arrays.asList(tableNameValuePair, tableTypeValuePair, enableParallelPushProtectionValuePair);
+    List<NameValuePair> parameters = Arrays
+        .asList(tableNameValuePair, tableTypeValuePair, enableParallelPushProtectionValuePair, allowRefreshValuePair);
     return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 33a1d55..484bc30 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -187,7 +187,8 @@ public class PinotSegmentUploadDownloadRestletResource {
   }
 
   private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, FormDataMultiPart multiPart,
-      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) {
+      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation,
+      boolean allowRefresh) {
     String uploadTypeStr = null;
     String crypterClassNameInHeader = null;
     String downloadUri = null;
@@ -301,7 +302,7 @@ public class PinotSegmentUploadDownloadRestletResource {
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName);
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh);
 
       return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
     } catch (WebApplicationException e) {
@@ -391,7 +392,7 @@ public class PinotSegmentUploadDownloadRestletResource {
 
   private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile,
       String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation, String crypter)
+      boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh)
       throws Exception {
     String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString();
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -399,7 +400,7 @@ public class PinotSegmentUploadDownloadRestletResource {
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
     zkOperator
         .completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, uploadedSegmentFile,
-            enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter);
+            enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, allowRefresh);
   }
 
   private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) {
@@ -431,12 +432,14 @@ public class PinotSegmentUploadDownloadRestletResource {
       @DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
-          boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request,
-      @Suspended final AsyncResponse asyncResponse) {
+          boolean enableParallelPushProtection,
+      @ApiParam(value = "Whether to refresh if the segment already exists") @DefaultValue("true")
+      @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
           uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection,
-              headers, request, false));
+              headers, request, false, allowRefresh));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -462,12 +465,14 @@ public class PinotSegmentUploadDownloadRestletResource {
       @DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
-          boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request,
-      @Suspended final AsyncResponse asyncResponse) {
+          boolean enableParallelPushProtection,
+      @ApiParam(value = "Whether to refresh if the segment already exists") @DefaultValue("true")
+      @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
           uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection,
-              headers, request, true));
+              headers, request, true, allowRefresh));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -495,12 +500,14 @@ public class PinotSegmentUploadDownloadRestletResource {
       @DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
-          boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request,
-      @Suspended final AsyncResponse asyncResponse) {
+          boolean enableParallelPushProtection,
+      @ApiParam(value = "Whether to refresh if the segment already exists") @DefaultValue("true")
+      @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
           uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection,
-              headers, request, true));
+              headers, request, true, allowRefresh));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -526,12 +533,14 @@ public class PinotSegmentUploadDownloadRestletResource {
       @DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
-          boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request,
-      @Suspended final AsyncResponse asyncResponse) {
+          boolean enableParallelPushProtection,
+      @ApiParam(value = "Whether to refresh if the segment already exists") @DefaultValue("true")
+      @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
           uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection,
-              headers, request, true));
+              headers, request, true, allowRefresh));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 17d549f..693d42f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -60,7 +60,8 @@ public class ZKOperator {
 
   public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
-      HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter)
+      HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter,
+      boolean allowRefresh)
       throws Exception {
     String segmentName = segmentMetadata.getName();
     ZNRecord segmentMetadataZNRecord =
@@ -79,6 +80,17 @@ public class ZKOperator {
       return;
     }
 
+    // We reach here if a segment with the same name already exists.
+
+    if (!allowRefresh) {
+      // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the check
+      // done up-front but ends up getting created before the check here, we could incorrectly refresh an existing
+      // segment.
+      throw new ControllerApplicationException(LOGGER,
+          "Segment: " + segmentName + " already exists in table: " + tableNameWithType + ". Refresh not permitted.",
+          Response.Status.CONFLICT);
+    }
+
     // TODO Allow segment refreshing for realtime tables.
     if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
       throw new ControllerApplicationException(LOGGER,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9445134..9e36e19 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1796,13 +1796,13 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Construct segmentZkMetadata for the realtime or offline table.
+   * Construct segmentZkMetadata for new segment of offline or realtime table.
    *
-   * @param tableNameWithType
-   * @param segmentMetadata
-   * @param downloadUrl
-   * @param crypter
-   * @return
+   * @param tableNameWithType Table name with type
+   * @param segmentMetadata Segment metadata
+   * @param downloadUrl Download URL
+   * @param crypter Crypter
+   * @return SegmentZkMetadata of the input segment
    */
   public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
       String downloadUrl, @Nullable String crypter) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 50f9137..b701258 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -78,7 +78,7 @@ public class ZKOperatorTest {
       zkOperator
           .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI,
               currentSegmentLocation, true, httpHeaders, "downloadUrl",
-              true, "crypter");
+              true, "crypter", true);
       fail();
     } catch (Exception e) {
       // Expected
@@ -94,7 +94,7 @@ public class ZKOperatorTest {
 
     zkOperator
         .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, "downloadUrl",
-            false, "crypter");
+            false, "crypter", true);
 
     SegmentZKMetadata segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
@@ -108,11 +108,20 @@ public class ZKOperatorTest {
     assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
     assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
 
+    // Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
+    try {
+      zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders,
+          "otherDownloadUrl", false, "otherCrypter", false);
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+
     // Refresh the segment with unmatched IF_MATCH field
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
       zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-          "otherDownloadUrl", false, null);
+          "otherDownloadUrl", false, null, true);
       fail();
     } catch (Exception e) {
       // Expected
@@ -123,7 +132,7 @@ public class ZKOperatorTest {
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter");
+        "otherDownloadUrl", false, "otherCrypter", true);
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
@@ -146,7 +155,7 @@ public class ZKOperatorTest {
     // not found!" exception from being thrown sporadically.
     Thread.sleep(1000L);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders,
-        "otherDownloadUrl", false, "otherCrypter");
+        "otherDownloadUrl", false, "otherCrypter", true);
     segmentZKMetadata =
         ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertEquals(segmentZKMetadata.getCrc(), 23456L);
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 1a90523..5f6364f 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -449,7 +449,7 @@ public abstract class ClusterTest extends ControllerTest {
         File segmentTarFile = segmentTarFiles.get(0);
         assertEquals(fileUploadDownloadClient
                 .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
-                    tableType.OFFLINE, enableParallelPushProtection)
+                    tableType.OFFLINE, enableParallelPushProtection, true)
                 .getStatusCode(),
             HttpStatus.SC_OK);
       } else {
@@ -460,7 +460,7 @@ public abstract class ClusterTest extends ControllerTest {
           futures.add(executorService.submit(() -> {
             return fileUploadDownloadClient
                 .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
-                    tableType.OFFLINE, enableParallelPushProtection)
+                    tableType.OFFLINE, enableParallelPushProtection, true)
                 .getStatusCode();
           }));
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org