You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/02 18:49:07 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8078: Segment retention for table

Jackie-Jiang commented on a change in pull request #8078:
URL: https://github.com/apache/pinot/pull/8078#discussion_r797888748



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -703,7 +703,9 @@ public synchronized PinotResourceManagerResponse deleteSegments(String tableName
       Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
           "Table name: %s is not a valid table name with type suffix", tableNameWithType);
       HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
-      _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
+      TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      long retentionMs = _segmentDeletionManager.getRetentionFromTableConfig(tableConfig);
+      _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, retentionMs);

Review comment:
       Let's just pass in the `tableConfig` to this method. Calling `_segmentDeletionManager.getRetentionFromTableConfig(tableConfig)` then passing it in is slightly confusing

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
##########
@@ -60,15 +62,15 @@
   private final String _helixClusterName;
   private final HelixAdmin _helixAdmin;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final int _defaultDeletedSegmentsRetentionInDays;
+  private final long _defaultDeletedSegmentsRetentionMillis;

Review comment:
       (minor)
   ```suggestion
     private final long _defaultDeletedSegmentsRetentionMs;
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
##########
@@ -84,22 +86,28 @@ public void stop() {
     _executorService.shutdownNow();
   }
 
-  public void deleteSegments(final String tableName, final Collection<String> segmentIds) {
-    deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS);
+  public void deleteSegments(String tableName, Collection<String> segmentIds) {
+    deleteSegments(tableName, segmentIds, _defaultDeletedSegmentsRetentionMillis);
   }
 
-  protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds,
-      final long deletionDelaySeconds) {
+  public void deleteSegments(String tableName, Collection<String> segmentIds,
+      long deletedSegmentsRetentionMillis) {

Review comment:
       (minor, same for other places)
   ```suggestion
         long deletedSegmentsRetentionMs) {
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
##########
@@ -222,60 +236,74 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId
   }
 
   /**
-   * Removes aged deleted segments from the deleted directory
-   * @param retentionInDays: retention for deleted segments in days
+   * Removes aged deleted segments from the deleted directory for a specific table.
+   * @param tableConfig: config for the table that needs to be deleted.
    */
-  public void removeAgedDeletedSegments(int retentionInDays) {
+  public void removeAgedDeletedSegments(TableConfig tableConfig) {
     if (_dataDir != null) {
-      URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
-      PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
+      String tableNameWithType = tableConfig.getTableName();
+      URI tableDeletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, tableNameWithType);
+      PinotFS pinotFS = PinotFSFactory.create(tableDeletedDirURI.getScheme());
 
       try {
         // Directly return when the deleted directory does not exist (no segment deleted yet)
-        if (!pinotFS.exists(deletedDirURI)) {
+        if (!pinotFS.exists(tableDeletedDirURI)) {
           return;
         }
 
-        if (!pinotFS.isDirectory(deletedDirURI)) {
-          LOGGER.warn("Deleted segments URI: {} is not a directory", deletedDirURI);
+        if (!pinotFS.isDirectory(tableDeletedDirURI)) {
+          LOGGER.warn("Deleted segments URI: {} is not a directory", tableDeletedDirURI);
           return;
         }
 
-        String[] tableNameDirs = pinotFS.listFiles(deletedDirURI, false);
-        if (tableNameDirs == null) {
-          LOGGER.warn("Failed to list files from the deleted segments directory: {}", deletedDirURI);
-          return;
-        }
+        long retentionMs = getRetentionFromTableConfig(tableConfig);
 
-        for (String tableNameDir : tableNameDirs) {
-          URI tableNameURI = URIUtils.getUri(tableNameDir);
-          // Get files that are aged
-          final String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
-          int numFilesDeleted = 0;
-          for (String targetFile : targetFiles) {
-            URI targetURI = URIUtils.getUri(targetFile);
-            Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
-            if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
-              if (!pinotFS.delete(targetURI, true)) {
-                LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
-              } else {
-                numFilesDeleted++;
-              }
+        // Get files that are aged
+        final String[] targetFiles = pinotFS.listFiles(tableDeletedDirURI, false);
+        int numFilesDeleted = 0;
+        for (String targetFile : targetFiles) {
+          URI targetURI = URIUtils.getUri(targetFile);
+          Date dateToDelete = DateTime.now().minus(retentionMs).toDate();
+          if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
+            if (!pinotFS.delete(targetURI, true)) {
+              LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
+            } else {
+              numFilesDeleted++;
             }
           }
+        }
 
-          if (numFilesDeleted == targetFiles.length) {
-            // Delete directory if it's empty
-            if (!pinotFS.delete(tableNameURI, false)) {
-              LOGGER.warn("The directory {} cannot be removed.", tableNameDir);
-            }
+        if (numFilesDeleted == targetFiles.length) {
+          // Delete directory if it's empty
+          if (!pinotFS.delete(tableDeletedDirURI, false)) {
+            LOGGER.warn("The directory {} cannot be removed.", tableDeletedDirURI);
           }
         }
       } catch (IOException e) {
-        LOGGER.error("Had trouble deleting directories: {}", deletedDirURI.toString(), e);
+        LOGGER.error("Had trouble deleting directories: {}", tableDeletedDirURI.toString(), e);
       }
     } else {
       LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
     }
   }
+
+  public long getRetentionFromTableConfig(TableConfig tableConfig) {
+    long retentionMs = _defaultDeletedSegmentsRetentionMillis;
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+      if (validationConfig.getDeletedSegmentRetentionTimeUnit() != null

Review comment:
       Can be simplified with `StringUtils.isEmpty()`

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
##########
@@ -29,6 +29,8 @@
 public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
   private String _retentionTimeUnit;
   private String _retentionTimeValue;
+  private String _deletedSegmentRetentionTimeUnit;

Review comment:
       Suggest changing it to `_deletedSegmentRetenionPeriod` for easier configuration. Period can be parsed using `TimeUtils.convertPeriodToMillis()`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
##########
@@ -222,60 +236,74 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId
   }
 
   /**
-   * Removes aged deleted segments from the deleted directory
-   * @param retentionInDays: retention for deleted segments in days
+   * Removes aged deleted segments from the deleted directory for a specific table.
+   * @param tableConfig: config for the table that needs to be deleted.
    */
-  public void removeAgedDeletedSegments(int retentionInDays) {
+  public void removeAgedDeletedSegments(TableConfig tableConfig) {
     if (_dataDir != null) {
-      URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
-      PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
+      String tableNameWithType = tableConfig.getTableName();
+      URI tableDeletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, tableNameWithType);
+      PinotFS pinotFS = PinotFSFactory.create(tableDeletedDirURI.getScheme());
 
       try {
         // Directly return when the deleted directory does not exist (no segment deleted yet)
-        if (!pinotFS.exists(deletedDirURI)) {
+        if (!pinotFS.exists(tableDeletedDirURI)) {
           return;
         }
 
-        if (!pinotFS.isDirectory(deletedDirURI)) {
-          LOGGER.warn("Deleted segments URI: {} is not a directory", deletedDirURI);
+        if (!pinotFS.isDirectory(tableDeletedDirURI)) {
+          LOGGER.warn("Deleted segments URI: {} is not a directory", tableDeletedDirURI);
           return;
         }
 
-        String[] tableNameDirs = pinotFS.listFiles(deletedDirURI, false);
-        if (tableNameDirs == null) {
-          LOGGER.warn("Failed to list files from the deleted segments directory: {}", deletedDirURI);
-          return;
-        }
+        long retentionMs = getRetentionFromTableConfig(tableConfig);
 
-        for (String tableNameDir : tableNameDirs) {
-          URI tableNameURI = URIUtils.getUri(tableNameDir);
-          // Get files that are aged
-          final String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
-          int numFilesDeleted = 0;
-          for (String targetFile : targetFiles) {
-            URI targetURI = URIUtils.getUri(targetFile);
-            Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
-            if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
-              if (!pinotFS.delete(targetURI, true)) {
-                LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
-              } else {
-                numFilesDeleted++;
-              }
+        // Get files that are aged
+        final String[] targetFiles = pinotFS.listFiles(tableDeletedDirURI, false);
+        int numFilesDeleted = 0;
+        for (String targetFile : targetFiles) {
+          URI targetURI = URIUtils.getUri(targetFile);
+          Date dateToDelete = DateTime.now().minus(retentionMs).toDate();
+          if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
+            if (!pinotFS.delete(targetURI, true)) {
+              LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
+            } else {
+              numFilesDeleted++;
             }
           }
+        }
 
-          if (numFilesDeleted == targetFiles.length) {
-            // Delete directory if it's empty
-            if (!pinotFS.delete(tableNameURI, false)) {
-              LOGGER.warn("The directory {} cannot be removed.", tableNameDir);
-            }
+        if (numFilesDeleted == targetFiles.length) {
+          // Delete directory if it's empty
+          if (!pinotFS.delete(tableDeletedDirURI, false)) {
+            LOGGER.warn("The directory {} cannot be removed.", tableDeletedDirURI);
           }
         }
       } catch (IOException e) {
-        LOGGER.error("Had trouble deleting directories: {}", deletedDirURI.toString(), e);
+        LOGGER.error("Had trouble deleting directories: {}", tableDeletedDirURI.toString(), e);
       }
     } else {
       LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
     }
   }
+
+  public long getRetentionFromTableConfig(TableConfig tableConfig) {
+    long retentionMs = _defaultDeletedSegmentsRetentionMillis;
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+      if (validationConfig.getDeletedSegmentRetentionTimeUnit() != null
+          && !validationConfig.getDeletedSegmentRetentionTimeUnit().isEmpty()
+          && validationConfig.getDeletedSegmentRetentionTimeValue() != null
+          && !validationConfig.getDeletedSegmentRetentionTimeValue().isEmpty()) {
+        try {
+          retentionMs = TimeUnit.valueOf(validationConfig.getDeletedSegmentRetentionTimeUnit().toUpperCase())
+              .toMillis(Long.parseLong(validationConfig.getDeletedSegmentRetentionTimeValue()));
+        } catch (Exception e) {
+          LOGGER.warn(String.format("Unable to parse deleted segment retention config for table %s, using to default",

Review comment:
       Put the default value in the log




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org