You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2023/02/07 04:24:44 UTC

[pinot] 01/01: Fix race condition on removing aged deleted segments

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-remove-aged-deleted-segments
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 187d0a669f1eb58e4c730f1be9d03491bb767987
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Mon Feb 6 20:24:22 2023 -0800

    Fix race condition on removing aged deleted segments
---
 .../org/apache/pinot/common/utils/URIUtils.java    |  8 +++++
 .../apache/pinot/common/utils/URIUtilsTest.java    |  7 ++++
 .../helix/core/SegmentDeletionManager.java         | 40 ++++++++++++----------
 .../helix/core/retention/RetentionManager.java     |  2 +-
 .../helix/core/retention/RetentionManagerTest.java |  6 ++--
 .../core/util/SegmentDeletionManagerTest.java      |  9 ++---
 6 files changed, 46 insertions(+), 26 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
index 042427b772..56902f8bf5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
@@ -70,6 +70,14 @@ public class URIUtils {
     return stringJoiner.toString();
   }
 
+  /**
+   * Returns the last part for the given path split by the file separator.
+   * If the file separator is not found, returns the whole path as the last part.
+   */
+  public static String getLastPart(String path) {
+    return path.substring(path.lastIndexOf(File.separator) + 1);
+  }
+
   /**
    * Returns the download URL with the segment name encoded.
    */
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
index 4cec446067..b498ce9b0f 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
@@ -52,6 +52,13 @@ public class URIUtilsTest {
     assertEquals(URIUtils.getPath("file:/foo/bar", "table", "segment+%25"), "file:/foo/bar/table/segment+%25");
   }
 
+  @Test
+  public void testGetLastPart() {
+    assertEquals(URIUtils.getLastPart("http://foo/bar"), "bar");
+    assertEquals(URIUtils.getLastPart("/foo/bar"), "bar");
+    assertEquals(URIUtils.getLastPart("file:/foo/bar"), "bar");
+  }
+
   @Test
   public void testConstructDownloadUrl() {
     assertEquals(URIUtils.constructDownloadUrl("http://foo/bar", "table", "segment"),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index ff74bea708..ecc7d337e8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -44,6 +44,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -264,7 +265,7 @@ public class SegmentDeletionManager {
   /**
    * Removes aged deleted segments from the deleted directory
    */
-  public void removeAgedDeletedSegments() {
+  public void removeAgedDeletedSegments(LeadControllerManager leadControllerManager) {
     if (_dataDir != null) {
       URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
       PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
@@ -287,26 +288,29 @@ public class SegmentDeletionManager {
         }
 
         for (String tableNameDir : tableNameDirs) {
-          URI tableNameURI = URIUtils.getUri(tableNameDir);
-          // Get files that are aged
-          final String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
-          int numFilesDeleted = 0;
-          for (String targetFile : targetFiles) {
-            URI targetURI = URIUtils.getUri(targetFile);
-            long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI));
-            if (System.currentTimeMillis() >= deletionTimeMs) {
-              if (!pinotFS.delete(targetURI, true)) {
-                LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
-              } else {
-                numFilesDeleted++;
+          String tableName = URIUtils.getLastPart(tableNameDir);
+          if (leadControllerManager.isLeaderForTable(tableName)) {
+            URI tableNameURI = URIUtils.getUri(tableNameDir);
+            // Get files that are aged
+            final String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
+            int numFilesDeleted = 0;
+            for (String targetFile : targetFiles) {
+              URI targetURI = URIUtils.getUri(targetFile);
+              long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI));
+              if (System.currentTimeMillis() >= deletionTimeMs) {
+                if (!pinotFS.delete(targetURI, true)) {
+                  LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI);
+                } else {
+                  numFilesDeleted++;
+                }
               }
             }
-          }
 
-          if (numFilesDeleted == targetFiles.length) {
-            // Delete directory if it's empty
-            if (!pinotFS.delete(tableNameURI, false)) {
-              LOGGER.warn("The directory {} cannot be removed.", tableNameDir);
+            if (numFilesDeleted == targetFiles.length) {
+              // Delete directory if it's empty
+              if (!pinotFS.delete(tableNameURI, false)) {
+                LOGGER.warn("The directory {} cannot be removed.", tableNameDir);
+              }
             }
           }
         }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 624ee914d7..ea9cda626f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -94,7 +94,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
   @Override
   protected void postprocess() {
     LOGGER.info("Removing aged deleted segments for all tables");
-    _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments();
+    _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_leadControllerManager);
   }
 
   private void manageRetentionForTable(TableConfig tableConfig) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 85aba90f36..fa6e9b0ed9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -98,7 +98,7 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
-    verify(deletionManager, times(1)).removeAgedDeletedSegments();
+    verify(deletionManager, times(1)).removeAgedDeletedSegments(any());
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
@@ -177,7 +177,7 @@ public class RetentionManagerTest {
           throws Throwable {
         return null;
       }
-    }).when(deletionManager).removeAgedDeletedSegments();
+    }).when(deletionManager).removeAgedDeletedSegments(any());
     when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
 
     // If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted
@@ -229,7 +229,7 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
-    verify(deletionManager, times(1)).removeAgedDeletedSegments();
+    verify(deletionManager, times(1)).removeAgedDeletedSegments(any());
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index a617dd8612..2dc4c4ac0e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -52,6 +52,7 @@ import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -230,7 +231,7 @@ public class SegmentDeletionManagerTest {
         tempDir.getAbsolutePath(), helixAdmin, propertyStore, 7);
 
     // Test delete when deleted segments directory does not exists
-    deletionManager.removeAgedDeletedSegments();
+    deletionManager.removeAgedDeletedSegments(any());
 
     // Create deleted directory
     String deletedDirectoryPath = tempDir + File.separator + "Deleted_Segments";
@@ -238,7 +239,7 @@ public class SegmentDeletionManagerTest {
     deletedDirectory.mkdir();
 
     // Test delete when deleted segments directory is empty
-    deletionManager.removeAgedDeletedSegments();
+    deletionManager.removeAgedDeletedSegments(any());
 
     // Create dummy directories and files
     File dummyDir1 = new File(deletedDirectoryPath + File.separator + "dummy1");
@@ -249,7 +250,7 @@ public class SegmentDeletionManagerTest {
     dummyDir3.mkdir();
 
     // Test delete when there is no files but some directories exist
-    deletionManager.removeAgedDeletedSegments();
+    deletionManager.removeAgedDeletedSegments(any());
     Assert.assertEquals(dummyDir1.exists(), false);
     Assert.assertEquals(dummyDir2.exists(), false);
     Assert.assertEquals(dummyDir3.exists(), false);
@@ -279,7 +280,7 @@ public class SegmentDeletionManagerTest {
     Assert.assertEquals(dummyDir3.list().length, 3);
 
     // Try to remove files with the retention of 1 days.
-    deletionManager.removeAgedDeletedSegments();
+    deletionManager.removeAgedDeletedSegments(any());
 
     // Check that only 1 day retention file is remaining
     Assert.assertEquals(dummyDir1.list().length, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org