You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/01 17:34:57 UTC
svn commit: r1130191 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/
Author: jbellis
Date: Wed Jun 1 15:34:57 2011
New Revision: 1130191
URL: http://svn.apache.org/viewvc?rev=1130191&view=rev
Log:
fix truncate/compaction race
patch by jbellis; reviewed by slebresne for CASSANDRA-2673
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1130191&r1=1130190&r2=1130191&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jun 1 15:34:57 2011
@@ -12,6 +12,7 @@
* close scrub file handles (CASSANDRA-2669)
* throttle migration replay (CASSANDRA-2714)
* optimize column serializer creation (CASSANDRA-2716)
+ * fix truncate/compaction race (CASSANDRA-2673)
0.7.6
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1130191&r1=1130190&r2=1130191&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jun 1 15:34:57 2011
@@ -1887,8 +1887,12 @@ public class ColumnFamilyStore implement
*/
public Future<?> truncate() throws IOException
{
- // snapshot will also flush, but we want to truncate the most possible, and anything in a flush written
- // after truncateAt won't be truncated.
+ // We have two goals here:
+ // - truncate should delete everything written before truncate was invoked
+ // - but not delete anything that isn't part of the snapshot we create.
+ // We accomplish this by first flushing manually, then snapshotting, and
+ // recording the timestamp IN BETWEEN those actions. Any sstables created
+ // with this timestamp or greater time, will not be marked for delete.
try
{
forceBlockingFlush();
@@ -1897,33 +1901,20 @@ public class ColumnFamilyStore implement
{
throw new RuntimeException(e);
}
-
- final long truncatedAt = System.currentTimeMillis();
- snapshot(Table.getTimestampedSnapshotName("before-truncate"));
-
- Runnable runnable = new WrappedRunnable()
+ // sleep a little to make sure that our truncatedAt comes after any sstable
+ // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
+ try
{
- public void runMayThrow() throws InterruptedException, IOException
- {
- // putting markCompacted on the commitlogUpdater thread ensures it will run
- // after any compactions that were in progress when truncate was called, are finished
- for (ColumnFamilyStore cfs : concatWithIndexes())
- {
- List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
- for (SSTableReader sstable : cfs.getSSTables())
- {
- if (!sstable.newSince(truncatedAt))
- truncatedSSTables.add(sstable);
- }
- cfs.markCompacted(truncatedSSTables);
- }
-
- // Invalidate row cache
- invalidateRowCache();
- }
- };
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ long truncatedAt = System.currentTimeMillis();
+ snapshot(Table.getTimestampedSnapshotName("before-truncate"));
- return postFlushExecutor.submit(runnable);
+ return CompactionManager.instance.submitTruncate(this, truncatedAt);
}
// if this errors out, we are in a world of hurt.
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1130191&r1=1130190&r2=1130191&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Wed Jun 1 15:34:57 2011
@@ -977,6 +977,30 @@ public class CompactionManager implement
return executor.submit(runnable);
}
+ public Future<?> submitTruncate(final ColumnFamilyStore main, final long truncatedAt)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException, IOException
+ {
+ for (ColumnFamilyStore cfs : main.concatWithIndexes())
+ {
+ List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (!sstable.newSince(truncatedAt))
+ truncatedSSTables.add(sstable);
+ }
+ cfs.markCompacted(truncatedSSTables);
+ }
+
+ main.invalidateRowCache();
+ }
+ };
+
+ return executor.submit(runnable);
+ }
+
private static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1130191&r1=1130190&r2=1130191&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Wed Jun 1 15:34:57 2011
@@ -677,23 +677,6 @@ public class Table
return Iterables.transform(DatabaseDescriptor.getTables(), transformer);
}
- /**
- * Performs a synchronous truncate operation, effectively deleting all data
- * from the column family cfname
- * @param cfname
- * @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
- */
- public void truncate(String cfname) throws InterruptedException, ExecutionException, IOException
- {
- logger.debug("Truncating...");
- ColumnFamilyStore cfs = getColumnFamilyStore(cfname);
- // truncate, blocking
- cfs.truncate().get();
- logger.debug("Truncation done.");
- }
-
@Override
public String toString() {
return getClass().getSimpleName() + "(name='" + name + "')";
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1130191&r1=1130190&r2=1130191&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Wed Jun 1 15:34:57 2011
@@ -52,7 +52,8 @@ public class TruncateVerbHandler impleme
try
{
- Table.open(t.keyspace).truncate(t.columnFamily);
+ ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
+ cfs.truncate().get();
}
catch (IOException e)
{