You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/01/14 18:33:06 UTC

[GitHub] [cassandra] josh-mckenzie commented on a change in pull request #1379: CASSANDRA-17233: Fix deleting old CDC commit log segments

josh-mckenzie commented on a change in pull request #1379:
URL: https://github.com/apache/cassandra/pull/1379#discussion_r785045772



##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean delete)
 
     /**
      * Delete the oldest hard-linked CDC commit log segment to free up space.
+     * @param bytesToFree, the minimum space to free up
      * @return total deleted file size in bytes
      */
-    public long deleteOldestLinkedCDCCommitLogSegment()
+    public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
     {
+        if (bytesToFree <= 0)
+            return 0;
+
         File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
         Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
         File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
-        Preconditions.checkState(files != null && files.length > 0,
-                                 "There should be at least 1 CDC commit log segment.");
+        if (files == null || files.length == 0)
+        {
+            logger.warn("Skip deleting due to no CDC commit log segments found.");
+            return 0;
+        }
         List<File> sorted = Arrays.stream(files)
-                                  .sorted(Comparator.comparingLong(File::lastModified))
+                                  // commit log file name (contains id) increases monotonically

Review comment:
       While true today, I'm a little concerned about this undocumented (effectively; has a comment here but nothing to indicate dependency on other side) coupling. Could we formalize or comment in the CommitLogSegment naming / generation that we depend on that functionality here and {@link X} in the JavaDoc to tie them together for future maintainers?

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean delete)
 
     /**
      * Delete the oldest hard-linked CDC commit log segment to free up space.
+     * @param bytesToFree, the minimum space to free up
      * @return total deleted file size in bytes
      */
-    public long deleteOldestLinkedCDCCommitLogSegment()
+    public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
     {
+        if (bytesToFree <= 0)
+            return 0;
+
         File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
         Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
         File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
-        Preconditions.checkState(files != null && files.length > 0,
-                                 "There should be at least 1 CDC commit log segment.");
+        if (files == null || files.length == 0)
+        {
+            logger.warn("Skip deleting due to no CDC commit log segments found.");
+            return 0;
+        }
         List<File> sorted = Arrays.stream(files)
-                                  .sorted(Comparator.comparingLong(File::lastModified))
+                                  // commit log file name (contains id) increases monotonically
+                                  .sorted(Comparator.comparing(File::name))
                                   .collect(Collectors.toList());
-        File oldestCdcFile = sorted.get(0);
-        File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
-        return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+        long bytesDeleted = 0;
+        long bytesRemaining = 0;
+        boolean deletionCompleted = false;
+        // keep deleting from old to new until it reaches to the goal or the current writting segment

Review comment:
       nit: writing

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -158,6 +185,16 @@ public void shutdown()
         return alloc;
     }
 
+    // Non-blocking mode is just enabled for CDC. The segment is still marked as FORBIDDEN.

Review comment:
       nit: rephrase to "Non-blocking mode has just recently been enabled for CDC" if that's the intent here. Reads like it's "only" set for CDC which is a little confusing.

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean delete)
 
     /**
      * Delete the oldest hard-linked CDC commit log segment to free up space.
+     * @param bytesToFree, the minimum space to free up
      * @return total deleted file size in bytes
      */
-    public long deleteOldestLinkedCDCCommitLogSegment()
+    public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
     {
+        if (bytesToFree <= 0)
+            return 0;
+
         File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
         Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
         File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
-        Preconditions.checkState(files != null && files.length > 0,
-                                 "There should be at least 1 CDC commit log segment.");
+        if (files == null || files.length == 0)
+        {
+            logger.warn("Skip deleting due to no CDC commit log segments found.");
+            return 0;
+        }
         List<File> sorted = Arrays.stream(files)
-                                  .sorted(Comparator.comparingLong(File::lastModified))
+                                  // commit log file name (contains id) increases monotonically
+                                  .sorted(Comparator.comparing(File::name))
                                   .collect(Collectors.toList());
-        File oldestCdcFile = sorted.get(0);
-        File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
-        return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+        long bytesDeleted = 0;
+        long bytesRemaining = 0;
+        boolean deletionCompleted = false;
+        // keep deleting from old to new until it reaches to the goal or the current writting segment
+        for (File linkedCdcFile : sorted)
+        {
+            // only evaluate/update when deletionCompleted is false
+            deletionCompleted = deletionCompleted
+                                || (bytesDeleted >= bytesToFree || linkedCdcFile.equals(allocatingFrom().getCDCFile()));

Review comment:
       I found this conditional a little convoluted to parse. Maybe we do something like:
    ```
              // only evaluate/update when deletionCompleted is false
               if (!deletionCompleted)
                   deletionCompleted = bytesDeleted > bytesToFree || linkedCdcFile.equals(allocatingFrom().getCDCFile());
   ```

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -84,21 +84,46 @@ public void discard(CommitLogSegment segment, boolean delete)
 
     /**
      * Delete the oldest hard-linked CDC commit log segment to free up space.
+     * @param bytesToFree, the minimum space to free up
      * @return total deleted file size in bytes

Review comment:
       I think we either need to amend this javadoc @return to match that we're returning the bytes remaining or we need to juggle around variable names below. Currently this seems incorrect.
   
   It looks like we're expecting CDC bytes remaining in CommitLogSegmentManagerCDC.processNewSegment, so I think this is probably just a vestigial javadoc entry.

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -295,18 +334,23 @@ void processNewSegment(CommitLogSegment segment)
 
         void processDiscardedSegment(CommitLogSegment segment)
         {
-            // See synchronization in CommitLogSegment.setCDCState
-            synchronized(segment.cdcStateLock)
+            if (!segment.getCDCFile().exists())
+            {
+                logger.debug("Skip updating size. The CDC commit log segement has been deleted already.");

Review comment:
       Recommend revise this to something like "Not processing discarded CommitLogSegment {}; this segment appears to have been deleted already.", segment)

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
          */
         void processNewSegment(CommitLogSegment segment)
         {
-            // See synchronization in CommitLogSegment.setCDCState
-            synchronized(segment.cdcStateLock)
+            int segmentSize = defaultSegmentSize();
+            long allowance = allowableCDCBytes();
+            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+            synchronized (segment.cdcStateLock)
             {
-                int segmentSize = defaultSegmentSize();
-                long allowance = allowableCDCBytes();
-                boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
                 segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
 
-                // Remove the oldest cdc segment file when exceeding the CDC storage allowance
-                while (!blocking && segmentSize + sizeInProgress.get() > allowance)
-                {
-                    long releasedSize = segmentManager.deleteOldestLinkedCDCCommitLogSegment();
-                    sizeInProgress.getAndAdd(-releasedSize);
-                    logger.debug("Freed up {} bytes after deleting the oldest CDC commit log segment in non-blocking mode. " +
-                                 "Total on-disk CDC size: {}; allowed CDC size: {}",
-                                 releasedSize, sizeInProgress.get() + segmentSize, allowance);
-                }
-
                 // Aggresively count in the (estimated) size of new segments.
                 if (segment.getCDCState() == CDCState.PERMITTED)
-                    sizeInProgress.getAndAdd(segmentSize);
+                    addSize(segmentSize);
+            }
+
+            // Remove the oldest cdc segment file when exceeding the CDC storage allowance
+            if (!blocking && sizeInProgress.get() > allowance)
+            {
+                long bytesToFree = sizeInProgress.get() - allowance;
+                long remaningSize = segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
+                long releasedSize = sizeInProgress.get() - remaningSize;
+                sizeInProgress.getAndSet(remaningSize);
+                logger.debug("Freed up {} ({}) bytes after deleting the oldest CDC commit log segments in non-blocking mode. " +
+                             "Total on-disk CDC size: {}; allowed CDC size: {}",

Review comment:
       Should we be passing `remainingSize` as the string param for our "Total on-disk CDC size:"? Passing bytesToFree will be listing the sizeInProgress - allowance which _seems_ off?

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
          */
         void processNewSegment(CommitLogSegment segment)
         {
-            // See synchronization in CommitLogSegment.setCDCState
-            synchronized(segment.cdcStateLock)
+            int segmentSize = defaultSegmentSize();
+            long allowance = allowableCDCBytes();
+            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+            synchronized (segment.cdcStateLock)
             {
-                int segmentSize = defaultSegmentSize();
-                long allowance = allowableCDCBytes();
-                boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
                 segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
 
-                // Remove the oldest cdc segment file when exceeding the CDC storage allowance
-                while (!blocking && segmentSize + sizeInProgress.get() > allowance)
-                {
-                    long releasedSize = segmentManager.deleteOldestLinkedCDCCommitLogSegment();
-                    sizeInProgress.getAndAdd(-releasedSize);
-                    logger.debug("Freed up {} bytes after deleting the oldest CDC commit log segment in non-blocking mode. " +
-                                 "Total on-disk CDC size: {}; allowed CDC size: {}",
-                                 releasedSize, sizeInProgress.get() + segmentSize, allowance);
-                }
-
                 // Aggresively count in the (estimated) size of new segments.

Review comment:
       nit: Spelling. Should be "Aggressively" (that's on me 😄 )

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
          */
         void processNewSegment(CommitLogSegment segment)
         {
-            // See synchronization in CommitLogSegment.setCDCState
-            synchronized(segment.cdcStateLock)
+            int segmentSize = defaultSegmentSize();
+            long allowance = allowableCDCBytes();
+            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+            synchronized (segment.cdcStateLock)
             {
-                int segmentSize = defaultSegmentSize();
-                long allowance = allowableCDCBytes();
-                boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
                 segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
 
-                // Remove the oldest cdc segment file when exceeding the CDC storage allowance

Review comment:
       Keep this bread crumb so people know where the other side of this synchronization is if they come back to modify later.

##########
File path: src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
##########
@@ -264,29 +301,31 @@ public void start()
          */
         void processNewSegment(CommitLogSegment segment)
         {
-            // See synchronization in CommitLogSegment.setCDCState
-            synchronized(segment.cdcStateLock)
+            int segmentSize = defaultSegmentSize();
+            long allowance = allowableCDCBytes();
+            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+
+            synchronized (segment.cdcStateLock)
             {
-                int segmentSize = defaultSegmentSize();
-                long allowance = allowableCDCBytes();
-                boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
                 segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
 
-                // Remove the oldest cdc segment file when exceeding the CDC storage allowance
-                while (!blocking && segmentSize + sizeInProgress.get() > allowance)
-                {
-                    long releasedSize = segmentManager.deleteOldestLinkedCDCCommitLogSegment();
-                    sizeInProgress.getAndAdd(-releasedSize);
-                    logger.debug("Freed up {} bytes after deleting the oldest CDC commit log segment in non-blocking mode. " +
-                                 "Total on-disk CDC size: {}; allowed CDC size: {}",
-                                 releasedSize, sizeInProgress.get() + segmentSize, allowance);
-                }
-
                 // Aggresively count in the (estimated) size of new segments.
                 if (segment.getCDCState() == CDCState.PERMITTED)
-                    sizeInProgress.getAndAdd(segmentSize);
+                    addSize(segmentSize);
+            }
+
+            // Remove the oldest cdc segment file when exceeding the CDC storage allowance
+            if (!blocking && sizeInProgress.get() > allowance)
+            {
+                long bytesToFree = sizeInProgress.get() - allowance;
+                long remaningSize = segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);

Review comment:
       nit: spelling. `remainingSize`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org