You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/08/01 16:34:08 UTC

git commit: Fix truncate to always call flush on table

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 1879d9928 -> 60eab4e45


Fix truncate to always call flush on table

Patch by Jeremiah Jordan; reviewed by tjake for (CASSANDRA-7511)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60eab4e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60eab4e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60eab4e4

Branch: refs/heads/cassandra-2.0
Commit: 60eab4e45e18d6b08350187acf56deed9654fda7
Parents: 1879d99
Author: Jake Luciani <ja...@apache.org>
Authored: Fri Aug 1 10:30:48 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Fri Aug 1 10:30:48 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 29 +++---------------
 .../org/apache/cassandra/db/DataTracker.java    | 18 -----------
 test/conf/cassandra.yaml                        |  1 +
 .../org/apache/cassandra/db/CommitLogTest.java  | 32 ++++++++++++++++++++
 6 files changed, 44 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1fcb556..33bab82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Fix truncate to always flush (CASSANDRA-7511)
  * Remove shuffle and taketoken (CASSANDRA-7601)
  * Switch liveRatio-related log messages to DEBUG (CASSANDRA-7467)
  * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bf0307b..d4c1f26 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1131,6 +1131,11 @@ public class DatabaseDescriptor
         return conf.auto_snapshot;
     }
 
+    @VisibleForTesting
+    public static void setAutoSnapshot(boolean autoSnapshot) {
+        conf.auto_snapshot = autoSnapshot;
+    }
+
     public static boolean isAutoBootstrap()
     {
         return conf.auto_bootstrap;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2824924..a3c080a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2002,31 +2002,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // position in the System keyspace.
         logger.debug("truncating {}", name);
 
-        if (DatabaseDescriptor.isAutoSnapshot())
-        {
-            // flush the CF being truncated before forcing the new segment
-            forceBlockingFlush();
-
-            // sleep a little to make sure that our truncatedAt comes after any sstable
-            // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
-        }
+        // flush the CF being truncated before forcing the new segment
+        forceBlockingFlush();
 
-        // nuke the memtable data w/o writing to disk first
-        Keyspace.switchLock.writeLock().lock();
-        try
-        {
-            for (ColumnFamilyStore cfs : concatWithIndexes())
-            {
-                Memtable mt = cfs.getMemtableThreadSafe();
-                if (!mt.isClean())
-                    mt.cfs.data.renewMemtable();
-            }
-        }
-        finally
-        {
-            Keyspace.switchLock.writeLock().unlock();
-        }
+        // sleep a little to make sure that our truncatedAt comes after any sstable
+        // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
 
         Runnable truncateRunnable = new Runnable()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index a9eef98..a0f880a 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -123,24 +123,6 @@ public class DataTracker
         return toFlushMemtable;
     }
 
-    /**
-     * Renew the current memtable without putting the old one for a flush.
-     * Used when we flush but a memtable is clean (in which case we must
-     * change it because it was frozen).
-     */
-    public void renewMemtable()
-    {
-        Memtable newMemtable = new Memtable(cfstore, view.get().memtable);
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.renewMemtable(newMemtable);
-        }
-        while (!view.compareAndSet(currentView, newView));
-        notifyRenewed(currentView.memtable);
-    }
-
     public void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
         // sstable may be null if we flushed batchlog and nothing needed to be retained

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index d92eba6..3bb29bb 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -6,6 +6,7 @@ cluster_name: Test Cluster
 in_memory_compaction_limit_in_mb: 1
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size_in_mb: 1
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60eab4e4/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 036ce15..a7df871 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
@@ -35,8 +36,10 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -257,4 +260,33 @@ public class CommitLogTest extends SchemaLoader
         }
     }
 
+    @Test
+    public void testTruncateWithoutSnapshot()  throws ExecutionException, InterruptedException
+    {
+        CommitLog.instance.resetUnsafe();
+        boolean prev = DatabaseDescriptor.isAutoSnapshot();
+        DatabaseDescriptor.setAutoSnapshot(false);
+        ColumnFamilyStore cfs1 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1");
+        ColumnFamilyStore cfs2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2");
+
+        final RowMutation rm1 = new RowMutation("Keyspace1", bytes("k"));
+        rm1.add("Standard1", bytes("c1"), ByteBuffer.allocate(100), 0);
+        rm1.apply();
+        cfs1.truncateBlocking();
+        DatabaseDescriptor.setAutoSnapshot(prev);
+        final RowMutation rm2 = new RowMutation("Keyspace1", bytes("k"));
+        rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
+
+        for (int i = 0 ; i < 5 ; i++)
+            CommitLog.instance.add(rm2);
+
+        Assert.assertEquals(2, CommitLog.instance.activeSegments());
+        ReplayPosition position = CommitLog.instance.getContext().get();
+        for (Keyspace ks : Keyspace.system())
+            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+                CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+        Assert.assertEquals(1, CommitLog.instance.activeSegments());
+    }
+
 }