You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/05/29 09:59:11 UTC

cassandra git commit: Avoid getting unreadable keys during anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 f1662b147 -> eaeabffc8


Avoid getting unreadable keys during anticompaction

Patch by marcuse; reviewed by benedict for CASSANDRA-9508


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eaeabffc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eaeabffc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eaeabffc

Branch: refs/heads/cassandra-2.1
Commit: eaeabffc8e0153922fda9b3d5f79abfd8dc1c119
Parents: f1662b1
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu May 28 15:48:48 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri May 29 09:32:32 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  4 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 37 +++++----
 .../io/sstable/SSTableRewriterTest.java         | 82 ++++++++++++++------
 4 files changed, 84 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecd380d..0e759b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508)
  * (cqlsh) Better float precision by default (CASSANDRA-9224)
  * Improve estimated row count (CASSANDRA-9107)
  * Optimize range tombstone memory footprint (CASSANDRA-8603)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fd28ceb..2f3f7df 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1068,8 +1068,8 @@ public class CompactionManager implements CompactionManagerMBean
             sstableAsSet.add(sstable);
 
             File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
-            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
-            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
+            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
 
             try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
                  CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 19b2b2c..824e58b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -51,24 +51,9 @@ import static org.apache.cassandra.utils.Throwables.merge;
  */
 public class SSTableRewriter
 {
-    private static long preemptiveOpenInterval;
-    static
-    {
-        long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
-        if (interval < 0 || FBUtilities.isWindows())
-            interval = Long.MAX_VALUE;
-        preemptiveOpenInterval = interval;
-    }
-
-    @VisibleForTesting
-    static void overrideOpenInterval(long size)
-    {
-        preemptiveOpenInterval = size;
-    }
-
     private final DataTracker dataTracker;
     private final ColumnFamilyStore cfs;
-
+    private final long preemptiveOpenInterval;
     private final long maxAge;
     private final List<SSTableReader> finished = new ArrayList<>();
     private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
@@ -96,6 +81,17 @@ public class SSTableRewriter
 
     public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
+        this(cfs, rewriting, maxAge, isOffline, true);
+    }
+
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+    {
+        this(cfs, rewriting, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
+    }
+
+    @VisibleForTesting
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+    {
         this.rewriting = rewriting;
         for (SSTableReader sstable : rewriting)
         {
@@ -106,6 +102,15 @@ public class SSTableRewriter
         this.cfs = cfs;
         this.maxAge = maxAge;
         this.isOffline = isOffline;
+        this.preemptiveOpenInterval = preemptiveOpenInterval;
+    }
+
+    private static long calculateOpenInterval(boolean shouldOpenEarly)
+    {
+        long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+        if (!shouldOpenEarly || interval < 0)
+            interval = Long.MAX_VALUE;
+        return interval;
     }
 
     public SSTableWriter currentWriter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5d66f97..e1b001e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -105,8 +105,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
@@ -137,8 +136,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
         boolean checked = false;
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
@@ -234,8 +232,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         long startStorageMetricsLoad = StorageMetrics.load.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
@@ -283,8 +280,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
@@ -413,8 +409,7 @@ public class SSTableRewriterTest extends SchemaLoader
         DecoratedKey origLast = s.last;
         long startSize = cfs.metric.liveDiskSpaceUsed.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         try (ISSTableScanner scanner = s.getScanner();
@@ -448,8 +443,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
@@ -497,8 +491,7 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
@@ -542,8 +535,7 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableReader s = writeFile(cfs, 400);
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(1000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
@@ -628,8 +620,7 @@ public class SSTableRewriterTest extends SchemaLoader
             cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         cfs.getDataTracker().markCompacting(compacting);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
         rewriter.switchWriter(w);
         try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
@@ -706,8 +697,7 @@ public class SSTableRewriterTest extends SchemaLoader
         compacting.add(s);
         cfs.getDataTracker().markCompacting(compacting);
 
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        SSTableRewriter.overrideOpenInterval(1);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
         rewriter.switchWriter(w);
         int keyCount = 0;
@@ -750,8 +740,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
         assertEquals(1, sstables.size());
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
         boolean checked = false;
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
         {
@@ -781,6 +770,55 @@ public class SSTableRewriterTest extends SchemaLoader
         validateCFS(cfs);
     }
 
+    /**
+     * emulates anticompaction - writing from one source sstable to two new sstables
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testTwoWriters() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
+        assertEquals(1, sstables.size());
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false);
+        SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+        {
+            ISSTableScanner scanner = scanners.scanners.get(0);
+            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            while (scanner.hasNext())
+            {
+                AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next()));
+
+                if (writer.currentWriter().getFilePointer() < 15000000)
+                    writer.append(row);
+                else
+                    writer2.append(row);
+            }
+            for (int i = 0; i < 5000; i++)
+            {
+                DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i));
+                ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF);
+                assertTrue(cf != null);
+            }
+        }
+        writer.abort();
+        writer2.abort();
+        cfs.getDataTracker().unmarkCompacting(sstables);
+        cfs.truncateBlocking();
+        SSTableDeletingTask.waitForDeletions();
+        validateCFS(cfs);
+    }
+
+
     private void validateKeys(Keyspace ks)
     {
         for (int i = 0; i < 100; i++)