You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/03 00:43:22 UTC
[07/10] git commit: Ensure writes have completed after dropping a
table, before recycling commit log segments
Ensure writes have completed after dropping a table, before recycling commit log segments
patch by Benedict Elliott Smith; reviewed by Jake Luciani for CASSANDRA-7437
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e393c93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e393c93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e393c93
Branch: refs/heads/trunk
Commit: 0e393c9356fdd6c5ad4b9d733b7f92056342ce47
Parents: 29a9934
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jul 2 23:41:22 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jul 2 23:41:22 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 ++
src/java/org/apache/cassandra/db/DefsTables.java | 17 +++++++++++------
src/java/org/apache/cassandra/db/Keyspace.java | 4 ++++
.../apache/cassandra/db/commitlog/CommitLog.java | 17 +++++++++--------
.../cassandra/db/commitlog/CommitLogSegment.java | 6 ++----
.../db/commitlog/CommitLogSegmentManager.java | 11 +++++++++--
.../apache/cassandra/utils/concurrent/OpOrder.java | 7 +++++++
7 files changed, 44 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c19690b..22c4123 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,7 @@
2.1.0
* (Windows) handle spaces in path names (CASSANDRA-7451)
+ * Ensure writes have completed after dropping a table, before recycling
+ commit log segments (CASSANDRA-7437)
Merged from 2.0:
* Fix CC#collectTimeOrderedData() tombstone optimisations (CASSANDRA-7394)
* Support DISTINCT for static columns and fix behaviour when DISTINC is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index ede3ebd..98ced8d 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -468,10 +468,13 @@ public class DefsTables
CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
+ Keyspace keyspace = Keyspace.open(ksm.name);
+
// remove all cfs from the keyspace instance.
+ List<UUID> droppedCfs = new ArrayList<>();
for (CFMetaData cfm : ksm.cfMetaData().values())
{
- ColumnFamilyStore cfs = Keyspace.open(ksm.name).getColumnFamilyStore(cfm.cfName);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
Schema.instance.purge(cfm);
@@ -481,15 +484,18 @@ public class DefsTables
cfs.snapshot(snapshotName);
Keyspace.open(ksm.name).dropCf(cfm.cfId);
}
- CommitLog.instance.discardColumnFamily(cfm.cfId);
+
+ droppedCfs.add(cfm.cfId);
}
// remove the keyspace from the static instances.
Keyspace.clear(ksm.name);
Schema.instance.clearKeyspaceDefinition(ksm);
+ keyspace.writeOrder.awaitNewBarrier();
+
// force a new segment in the CL
- CommitLog.instance.forceRecycleAllSegments();
+ CommitLog.instance.forceRecycleAllSegments(droppedCfs);
if (!StorageService.instance.isClientMode())
{
@@ -512,15 +518,14 @@ public class DefsTables
CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
- CommitLog.instance.discardColumnFamily(cfm.cfId);
- CommitLog.instance.forceRecycleAllSegments();
-
if (!StorageService.instance.isClientMode())
{
if (DatabaseDescriptor.isAutoSnapshot())
cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
Keyspace.open(ksm.name).dropCf(cfm.cfId);
MigrationManager.instance.notifyDropColumnFamily(cfm);
+
+ CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ceeac70..ff4d272 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -298,6 +298,10 @@ public class Keyspace
if (cfs == null)
return;
+ // wait for any outstanding reads/writes that might affect the CFS
+ cfs.keyspace.writeOrder.awaitNewBarrier();
+ cfs.readOrdering.awaitNewBarrier();
+
unloadCf(cfs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 41c01c3..cf8a7f6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,9 +158,17 @@ public class CommitLog implements CommitLogMBean
/**
* Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
*/
+ public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
+ {
+ allocator.forceRecycleAll(droppedCfs);
+ }
+
+ /**
+ * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+ */
public void forceRecycleAllSegments()
{
- allocator.forceRecycleAll();
+ allocator.forceRecycleAll(Collections.<UUID>emptyList());
}
/**
@@ -240,13 +248,6 @@ public class CommitLog implements CommitLogMBean
return alloc;
}
- public void discardColumnFamily(final UUID cfId)
- {
- ReplayPosition context = getContext();
- for (CommitLogSegment cls : allocator.getActiveSegments())
- cls.markClean(cfId, context);
- }
-
/**
* Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
* given. Discards any commit log segments that are no longer used.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 78ba824..a983f01 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -251,12 +251,10 @@ public class CommitLogSegment
/**
* Wait for any appends or discardUnusedTail() operations started before this method was called
*/
- private synchronized void waitForModifications()
+ void waitForModifications()
{
// issue a barrier and wait for it
- OpOrder.Barrier barrier = appendOrder.newBarrier();
- barrier.issue();
- barrier.await();
+ appendOrder.awaitNewBarrier();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index ed0a7ff..e1a7e39 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -286,18 +286,24 @@ public class CommitLogSegmentManager
* Flushes any dirty CFs for this segment and any older segments, and then recycles
* the segments
*/
- void forceRecycleAll()
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
{
List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
advanceAllocatingFrom(last);
+ last.waitForModifications();
+
// flush and wait for all CFs that are dirty in segments up-to and including 'last'
Future<?> future = flushDataFrom(segmentsToRecycle, true);
try
{
future.get();
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, segment.getContext());
+
// now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
// if the previous active segment was the only one to recycle (since an active segment isn't
// necessarily dirty, and we only call dCS after a flush).
@@ -306,7 +312,8 @@ public class CommitLogSegmentManager
recycleSegment(segment);
CommitLogSegment first;
- assert (first = activeSegments.peek()) == null || first.id > last.id;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e393c93/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
index bc43e10..5cebf44 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -124,6 +124,13 @@ public class OpOrder
return current;
}
+ public void awaitNewBarrier()
+ {
+ Barrier barrier = newBarrier();
+ barrier.issue();
+ barrier.await();
+ }
+
/**
* Represents a group of identically ordered operations, i.e. all operations started in the interval between
* two barrier issuances. For each register() call this is returned, close() must be called exactly once.