You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/03 00:43:22 UTC

[07/10] git commit: Ensure writes have completed after dropping a table, before recycling commit log segments

Ensure writes have completed after dropping a table, before recycling commit log segments

patch by Benedict Elliott Smith; reviewed by Jake Luciani for CASSANDRA-7437


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e393c93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e393c93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e393c93

Branch: refs/heads/trunk
Commit: 0e393c9356fdd6c5ad4b9d733b7f92056342ce47
Parents: 29a9934
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jul 2 23:41:22 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jul 2 23:41:22 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |  2 ++
 src/java/org/apache/cassandra/db/DefsTables.java   | 17 +++++++++++------
 src/java/org/apache/cassandra/db/Keyspace.java     |  4 ++++
 .../apache/cassandra/db/commitlog/CommitLog.java   | 17 +++++++++--------
 .../cassandra/db/commitlog/CommitLogSegment.java   |  6 ++----
 .../db/commitlog/CommitLogSegmentManager.java      | 11 +++++++++--
 .../apache/cassandra/utils/concurrent/OpOrder.java |  7 +++++++
 7 files changed, 44 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c19690b..22c4123 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,7 @@
 2.1.0
  * (Windows) handle spaces in path names (CASSANDRA-7451)
+ * Ensure writes have completed after dropping a table, before recycling
+   commit log segments (CASSANDRA-7437)
 Merged from 2.0:
  * Fix CC#collectTimeOrderedData() tombstone optimisations (CASSANDRA-7394)
  * Support DISTINCT for static columns and fix behaviour when DISTINC is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index ede3ebd..98ced8d 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -468,10 +468,13 @@ public class DefsTables
 
         CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
 
+        Keyspace keyspace = Keyspace.open(ksm.name);
+
         // remove all cfs from the keyspace instance.
+        List<UUID> droppedCfs = new ArrayList<>();
         for (CFMetaData cfm : ksm.cfMetaData().values())
         {
-            ColumnFamilyStore cfs = Keyspace.open(ksm.name).getColumnFamilyStore(cfm.cfName);
+            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
 
             Schema.instance.purge(cfm);
 
@@ -481,15 +484,18 @@ public class DefsTables
                     cfs.snapshot(snapshotName);
                 Keyspace.open(ksm.name).dropCf(cfm.cfId);
             }
-            CommitLog.instance.discardColumnFamily(cfm.cfId);
+
+            droppedCfs.add(cfm.cfId);
         }
 
         // remove the keyspace from the static instances.
         Keyspace.clear(ksm.name);
         Schema.instance.clearKeyspaceDefinition(ksm);
 
+        keyspace.writeOrder.awaitNewBarrier();
+
         // force a new segment in the CL
-        CommitLog.instance.forceRecycleAllSegments();
+        CommitLog.instance.forceRecycleAllSegments(droppedCfs);
 
         if (!StorageService.instance.isClientMode())
         {
@@ -512,15 +518,14 @@ public class DefsTables
 
         CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 
-        CommitLog.instance.discardColumnFamily(cfm.cfId);
-        CommitLog.instance.forceRecycleAllSegments();
-
         if (!StorageService.instance.isClientMode())
         {
             if (DatabaseDescriptor.isAutoSnapshot())
                 cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
             Keyspace.open(ksm.name).dropCf(cfm.cfId);
             MigrationManager.instance.notifyDropColumnFamily(cfm);
+
+            CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ceeac70..ff4d272 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -298,6 +298,10 @@ public class Keyspace
         if (cfs == null)
             return;
 
+        // wait for any outstanding reads/writes that might affect the CFS
+        cfs.keyspace.writeOrder.awaitNewBarrier();
+        cfs.readOrdering.awaitNewBarrier();
+
         unloadCf(cfs);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 41c01c3..cf8a7f6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,9 +158,17 @@ public class CommitLog implements CommitLogMBean
     /**
      * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
      */
+    public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
+    {
+        allocator.forceRecycleAll(droppedCfs);
+    }
+
+    /**
+     * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+     */
     public void forceRecycleAllSegments()
     {
-        allocator.forceRecycleAll();
+        allocator.forceRecycleAll(Collections.<UUID>emptyList());
     }
 
     /**
@@ -240,13 +248,6 @@ public class CommitLog implements CommitLogMBean
         return alloc;
     }
 
-    public void discardColumnFamily(final UUID cfId)
-    {
-        ReplayPosition context = getContext();
-        for (CommitLogSegment cls : allocator.getActiveSegments())
-            cls.markClean(cfId, context);
-    }
-
     /**
      * Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
      * given. Discards any commit log segments that are no longer used.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 78ba824..a983f01 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -251,12 +251,10 @@ public class CommitLogSegment
     /**
      * Wait for any appends or discardUnusedTail() operations started before this method was called
      */
-    private synchronized void waitForModifications()
+    void waitForModifications()
     {
         // issue a barrier and wait for it
-        OpOrder.Barrier barrier = appendOrder.newBarrier();
-        barrier.issue();
-        barrier.await();
+        appendOrder.awaitNewBarrier();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index ed0a7ff..e1a7e39 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -286,18 +286,24 @@ public class CommitLogSegmentManager
      * Flushes any dirty CFs for this segment and any older segments, and then recycles
      * the segments
      */
-    void forceRecycleAll()
+    void forceRecycleAll(Iterable<UUID> droppedCfs)
     {
         List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
         CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
         advanceAllocatingFrom(last);
 
+        last.waitForModifications();
+
         // flush and wait for all CFs that are dirty in segments up-to and including 'last'
         Future<?> future = flushDataFrom(segmentsToRecycle, true);
         try
         {
             future.get();
 
+            for (CommitLogSegment segment : activeSegments)
+                for (UUID cfId : droppedCfs)
+                    segment.markClean(cfId, segment.getContext());
+
             // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
             // if the previous active segment was the only one to recycle (since an active segment isn't
             // necessarily dirty, and we only call dCS after a flush).
@@ -306,7 +312,8 @@ public class CommitLogSegmentManager
                     recycleSegment(segment);
 
             CommitLogSegment first;
-            assert (first = activeSegments.peek()) == null || first.id > last.id;
+            if ((first = activeSegments.peek()) != null && first.id <= last.id)
+                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
index bc43e10..5cebf44 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -124,6 +124,13 @@ public class OpOrder
         return current;
     }
 
+    public void awaitNewBarrier()
+    {
+        Barrier barrier = newBarrier();
+        barrier.issue();
+        barrier.await();
+    }
+
     /**
      * Represents a group of identically ordered operations, i.e. all operations started in the interval between
      * two barrier issuances. For each register() call this is returned, close() must be called exactly once.