You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/05/29 09:59:11 UTC
cassandra git commit: Avoid getting unreadable keys during
anticompaction
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 f1662b147 -> eaeabffc8
Avoid getting unreadable keys during anticompaction
Patch by marcuse; reviewed by benedict for CASSANDRA-9508
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eaeabffc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eaeabffc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eaeabffc
Branch: refs/heads/cassandra-2.1
Commit: eaeabffc8e0153922fda9b3d5f79abfd8dc1c119
Parents: f1662b1
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu May 28 15:48:48 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri May 29 09:32:32 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 4 +-
.../cassandra/io/sstable/SSTableRewriter.java | 37 +++++----
.../io/sstable/SSTableRewriterTest.java | 82 ++++++++++++++------
4 files changed, 84 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecd380d..0e759b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.6
+ * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508)
* (cqlsh) Better float precision by default (CASSANDRA-9224)
* Improve estimated row count (CASSANDRA-9107)
* Optimize range tombstone memory footprint (CASSANDRA-8603)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fd28ceb..2f3f7df 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1068,8 +1068,8 @@ public class CompactionManager implements CompactionManagerMBean
sstableAsSet.add(sstable);
File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 19b2b2c..824e58b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -51,24 +51,9 @@ import static org.apache.cassandra.utils.Throwables.merge;
*/
public class SSTableRewriter
{
- private static long preemptiveOpenInterval;
- static
- {
- long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
- if (interval < 0 || FBUtilities.isWindows())
- interval = Long.MAX_VALUE;
- preemptiveOpenInterval = interval;
- }
-
- @VisibleForTesting
- static void overrideOpenInterval(long size)
- {
- preemptiveOpenInterval = size;
- }
-
private final DataTracker dataTracker;
private final ColumnFamilyStore cfs;
-
+ private final long preemptiveOpenInterval;
private final long maxAge;
private final List<SSTableReader> finished = new ArrayList<>();
private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
@@ -96,6 +81,17 @@ public class SSTableRewriter
public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
{
+ this(cfs, rewriting, maxAge, isOffline, true);
+ }
+
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+ {
+ this(cfs, rewriting, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
+ }
+
+ @VisibleForTesting
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+ {
this.rewriting = rewriting;
for (SSTableReader sstable : rewriting)
{
@@ -106,6 +102,15 @@ public class SSTableRewriter
this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
+ this.preemptiveOpenInterval = preemptiveOpenInterval;
+ }
+
+ private static long calculateOpenInterval(boolean shouldOpenEarly)
+ {
+ long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+ if (!shouldOpenEarly || interval < 0)
+ interval = Long.MAX_VALUE;
+ return interval;
}
public SSTableWriter currentWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5d66f97..e1b001e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -105,8 +105,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
@@ -137,8 +136,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
@@ -234,8 +232,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
long startStorageMetricsLoad = StorageMetrics.load.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -283,8 +280,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -413,8 +409,7 @@ public class SSTableRewriterTest extends SchemaLoader
DecoratedKey origLast = s.last;
long startSize = cfs.metric.liveDiskSpaceUsed.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
try (ISSTableScanner scanner = s.getScanner();
@@ -448,8 +443,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -497,8 +491,7 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -542,8 +535,7 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 400);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -628,8 +620,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000);
SSTableWriter w = getWriter(cfs, s.descriptor.directory);
rewriter.switchWriter(w);
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
@@ -706,8 +697,7 @@ public class SSTableRewriterTest extends SchemaLoader
compacting.add(s);
cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- SSTableRewriter.overrideOpenInterval(1);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1);
SSTableWriter w = getWriter(cfs, s.descriptor.directory);
rewriter.switchWriter(w);
int keyCount = 0;
@@ -750,8 +740,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
{
@@ -781,6 +770,55 @@ public class SSTableRewriterTest extends SchemaLoader
validateCFS(cfs);
}
+ /**
+ * emulates anticompaction - writing from one source sstable to two new sstables
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTwoWriters() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
+ assertEquals(1, sstables.size());
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false);
+ SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false);
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+ {
+ ISSTableScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while (scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next()));
+
+ if (writer.currentWriter().getFilePointer() < 15000000)
+ writer.append(row);
+ else
+ writer2.append(row);
+ }
+ for (int i = 0; i < 5000; i++)
+ {
+ DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i));
+ ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF);
+ assertTrue(cf != null);
+ }
+ }
+ writer.abort();
+ writer2.abort();
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ cfs.truncateBlocking();
+ SSTableDeletingTask.waitForDeletions();
+ validateCFS(cfs);
+ }
+
+
private void validateKeys(Keyspace ks)
{
for (int i = 0; i < 100; i++)