You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/05/29 09:59:50 UTC
[1/3] cassandra git commit: Avoid getting unreadable keys during
anticompaction
Repository: cassandra
Updated Branches:
refs/heads/trunk 48c25ba2e -> 601944569
Avoid getting unreadable keys during anticompaction
Patch by marcuse; reviewed by benedict for CASSANDRA-9508
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eaeabffc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eaeabffc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eaeabffc
Branch: refs/heads/trunk
Commit: eaeabffc8e0153922fda9b3d5f79abfd8dc1c119
Parents: f1662b1
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu May 28 15:48:48 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri May 29 09:32:32 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 4 +-
.../cassandra/io/sstable/SSTableRewriter.java | 37 +++++----
.../io/sstable/SSTableRewriterTest.java | 82 ++++++++++++++------
4 files changed, 84 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecd380d..0e759b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.6
+ * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508)
* (cqlsh) Better float precision by default (CASSANDRA-9224)
* Improve estimated row count (CASSANDRA-9107)
* Optimize range tombstone memory footprint (CASSANDRA-8603)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fd28ceb..2f3f7df 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1068,8 +1068,8 @@ public class CompactionManager implements CompactionManagerMBean
sstableAsSet.add(sstable);
File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 19b2b2c..824e58b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -51,24 +51,9 @@ import static org.apache.cassandra.utils.Throwables.merge;
*/
public class SSTableRewriter
{
- private static long preemptiveOpenInterval;
- static
- {
- long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
- if (interval < 0 || FBUtilities.isWindows())
- interval = Long.MAX_VALUE;
- preemptiveOpenInterval = interval;
- }
-
- @VisibleForTesting
- static void overrideOpenInterval(long size)
- {
- preemptiveOpenInterval = size;
- }
-
private final DataTracker dataTracker;
private final ColumnFamilyStore cfs;
-
+ private final long preemptiveOpenInterval;
private final long maxAge;
private final List<SSTableReader> finished = new ArrayList<>();
private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
@@ -96,6 +81,17 @@ public class SSTableRewriter
public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
{
+ this(cfs, rewriting, maxAge, isOffline, true);
+ }
+
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+ {
+ this(cfs, rewriting, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
+ }
+
+ @VisibleForTesting
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+ {
this.rewriting = rewriting;
for (SSTableReader sstable : rewriting)
{
@@ -106,6 +102,15 @@ public class SSTableRewriter
this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
+ this.preemptiveOpenInterval = preemptiveOpenInterval;
+ }
+
+ private static long calculateOpenInterval(boolean shouldOpenEarly)
+ {
+ long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+ if (!shouldOpenEarly || interval < 0)
+ interval = Long.MAX_VALUE;
+ return interval;
}
public SSTableWriter currentWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eaeabffc/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5d66f97..e1b001e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -105,8 +105,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
@@ -137,8 +136,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
@@ -234,8 +232,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
long startStorageMetricsLoad = StorageMetrics.load.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -283,8 +280,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -413,8 +409,7 @@ public class SSTableRewriterTest extends SchemaLoader
DecoratedKey origLast = s.last;
long startSize = cfs.metric.liveDiskSpaceUsed.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
try (ISSTableScanner scanner = s.getScanner();
@@ -448,8 +443,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -497,8 +491,7 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -542,8 +535,7 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 400);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
@@ -628,8 +620,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000);
SSTableWriter w = getWriter(cfs, s.descriptor.directory);
rewriter.switchWriter(w);
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
@@ -706,8 +697,7 @@ public class SSTableRewriterTest extends SchemaLoader
compacting.add(s);
cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- SSTableRewriter.overrideOpenInterval(1);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1);
SSTableWriter w = getWriter(cfs, s.descriptor.directory);
rewriter.switchWriter(w);
int keyCount = 0;
@@ -750,8 +740,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
{
@@ -781,6 +770,55 @@ public class SSTableRewriterTest extends SchemaLoader
validateCFS(cfs);
}
+ /**
+ * emulates anticompaction - writing from one source sstable to two new sstables
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTwoWriters() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
+ assertEquals(1, sstables.size());
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false);
+ SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false);
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+ {
+ ISSTableScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while (scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next()));
+
+ if (writer.currentWriter().getFilePointer() < 15000000)
+ writer.append(row);
+ else
+ writer2.append(row);
+ }
+ for (int i = 0; i < 5000; i++)
+ {
+ DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i));
+ ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF);
+ assertTrue(cf != null);
+ }
+ }
+ writer.abort();
+ writer2.abort();
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ cfs.truncateBlocking();
+ SSTableDeletingTask.waitForDeletions();
+ validateCFS(cfs);
+ }
+
+
private void validateKeys(Keyspace ks)
{
for (int i = 0; i < 100; i++)
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60194456
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60194456
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60194456
Branch: refs/heads/trunk
Commit: 601944569cbe8f8aed5f1e3448828586ba3bc936
Parents: 48c25ba 180130a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri May 29 09:58:49 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri May 29 09:58:49 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 4 +-
.../cassandra/io/sstable/SSTableRewriter.java | 41 +++++-----
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../io/sstable/SSTableRewriterTest.java | 82 ++++++++++++++------
5 files changed, 88 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60194456/CHANGES.txt
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/180130a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/180130a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/180130a8
Branch: refs/heads/trunk
Commit: 180130a8e59b2848ad843d74c09fabfa5e82eab1
Parents: 7d06762 eaeabff
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri May 29 09:57:54 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri May 29 09:57:54 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 4 +-
.../cassandra/io/sstable/SSTableRewriter.java | 41 +++++-----
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../io/sstable/SSTableRewriterTest.java | 82 ++++++++++++++------
5 files changed, 88 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 618f063,0e759b7..262f8c8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
-2.1.6
+2.2
+ * Disallow frozen<> types in function arguments and return types for
+ clarity (CASSANDRA-9411)
+ * Static Analysis to warn on unsafe use of Autocloseable instances (CASSANDRA-9431)
+ * Update commitlog archiving examples now that commitlog segments are
+ not recycled (CASSANDRA-9350)
+ * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
+ * (cqlsh) Add support for native protocol 4 (CASSANDRA-9399)
+ * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
+ * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
+ * Add ability to stop compaction by ID (CASSANDRA-7207)
+ * Let CassandraVersion handle SNAPSHOT version (CASSANDRA-9438)
+Merged from 2.1:
+ * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508)
* (cqlsh) Better float precision by default (CASSANDRA-9224)
* Improve estimated row count (CASSANDRA-9107)
* Optimize range tombstone memory footprint (CASSANDRA-8603)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 26dab7c,2f3f7df..a2783da
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1187,90 -1060,68 +1187,90 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ if (anticompactionGroup.originals().size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
+ return 0;
+ }
- File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
+
+ File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
++ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
++ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
+ {
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
- {
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys());
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
+ metrics.beginCompaction(ci);
+ try
+ {
+ @SuppressWarnings("resource")
+ CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+ while (iter.hasNext())
{
- while (iter.hasNext())
+ @SuppressWarnings("resource")
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
- finally
- {
- metrics.finishCompaction(ci);
- }
- anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
}
- catch (Throwable e)
+ finally
{
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
+ metrics.finishCompaction(ci);
}
- }
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
- return anticompactedSSTables;
+ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+ // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
+ // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
+ // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
+ anticompactionGroup.permitRedundantTransitions();
+ repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
+ unRepairedSSTableWriter.prepareToCommit();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+ repairedSSTableWriter.commit();
+ unRepairedSSTableWriter.commit();
+
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount + unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
+ return anticompactedSSTables.size();
+ }
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8029075,824e58b..011c7d9
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -47,56 -49,68 +47,59 @@@ import org.apache.cassandra.utils.concu
* but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
* successfully.
*/
-public class SSTableRewriter
+public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional
{
- private static long preemptiveOpenInterval;
- static
- {
- long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
- if (interval < 0)
- interval = Long.MAX_VALUE;
- preemptiveOpenInterval = interval;
- }
-
- @VisibleForTesting
- public static void overrideOpenInterval(long size)
- {
- preemptiveOpenInterval = size;
- }
- private final DataTracker dataTracker;
+ @VisibleForTesting
- public static long getOpenInterval()
- {
- return preemptiveOpenInterval;
- }
++ public static boolean disableEarlyOpeningForTests = false;
+
private final ColumnFamilyStore cfs;
-
+ private final long preemptiveOpenInterval;
private final long maxAge;
- private final List<SSTableReader> finished = new ArrayList<>();
- private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
- private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
+ private long repairedAt = -1;
+ // the set of final readers we will expose on commit
+ private final LifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced)
+ private final List<SSTableReader> preparedForCommit = new ArrayList<>();
private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
- private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finishedReaders = new ArrayList<>();
- private final Queue<Finished> finishedEarly = new ArrayDeque<>();
- // as writers are closed from finishedEarly, their last readers are moved
- // into discard, so that abort can cleanup after us safely
- private final List<SSTableReader> discard = new ArrayList<>();
- private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+ private final List<SSTableWriter> writers = new ArrayList<>();
+ private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
- private State state = State.WORKING;
- private static enum State
- {
- WORKING, FINISHED, ABORTED
- }
+ // for testing (TODO: remove when have byteman setup)
+ private boolean throwEarly, throwLate;
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
+ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline)
{
- this(cfs, rewriting, maxAge, isOffline, true);
++ this(cfs, transaction, maxAge, isOffline, true);
+ }
+
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly)
++ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+ {
- this(cfs, rewriting, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
++ this(cfs, transaction, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
+ }
+
+ @VisibleForTesting
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval)
++ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+ {
- this.rewriting = rewriting;
- for (SSTableReader sstable : rewriting)
- {
- originalStarts.put(sstable.descriptor, sstable.first);
+ this.transaction = transaction;
+ for (SSTableReader sstable : this.transaction.originals())
fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
- }
- this.dataTracker = cfs.getDataTracker();
this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
+ this.preemptiveOpenInterval = preemptiveOpenInterval;
+ }
+
+ private static long calculateOpenInterval(boolean shouldOpenEarly)
+ {
+ long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
- if (!shouldOpenEarly || interval < 0)
++ if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0)
+ interval = Long.MAX_VALUE;
+ return interval;
}
public SSTableWriter currentWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index dbbce9e,028cf6c..d4579af
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -273,21 -234,6 +273,21 @@@ public class ScrubTes
}
@Test
+ public void testScrubCorruptedCounterRowNoEarlyOpen() throws IOException, WriteTimeoutException
+ {
- long oldOpenVal = SSTableRewriter.getOpenInterval();
++ boolean oldDisabledVal = SSTableRewriter.disableEarlyOpeningForTests;
+ try
+ {
- SSTableRewriter.overrideOpenInterval(Long.MAX_VALUE);
++ SSTableRewriter.disableEarlyOpeningForTests = true;
+ testScrubCorruptedCounterRow();
+ }
+ finally
+ {
- SSTableRewriter.overrideOpenInterval(oldOpenVal);
++ SSTableRewriter.disableEarlyOpeningForTests = oldDisabledVal;
+ }
+ }
+
+ @Test
public void testScrubDeletedRow() throws ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 265bb6a,e1b001e..9e1cb91
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -130,10 -105,8 +131,10 @@@ public class SSTableRewriterTest extend
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
++
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@@ -164,11 -136,9 +165,11 @@@
cfs.addSSTable(s);
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
++
boolean checked = false;
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
{
ISSTableScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@@ -261,19 -230,15 +262,18 @@@
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
- long startStorageMetricsLoad = StorageMetrics.load.count();
+ long startStorageMetricsLoad = StorageMetrics.load.getCount();
+ long sBytesOnDisk = s.bytesOnDisk();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -315,16 -280,13 +315,15 @@@
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -437,22 -407,24 +436,21 @@@
DecoratedKey origFirst = s.first;
DecoratedKey origLast = s.last;
- long startSize = cfs.metric.liveDiskSpaceUsed.count();
+ long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);)
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
test.run(scanner, controller, s, cfs, rewriter);
}
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
- }
- Thread.sleep(1000);
- assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
+ SSTableDeletingTask.waitForDeletions();
+
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(1, cfs.getSSTables().size());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@@ -471,15 -443,13 +469,14 @@@
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -516,16 -491,13 +513,15 @@@
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -558,16 -535,13 +554,15 @@@
SSTableReader s = writeFile(cfs, 400);
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
try (ISSTableScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1000000);)
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -648,15 -619,13 +643,14 @@@
if (!offline)
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000);
- SSTableWriter w = getWriter(cfs, s.descriptor.directory);
- rewriter.switchWriter(w);
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
+ : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline);
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline, 10000000);
+ )
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while (scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -729,16 -695,15 +723,15 @@@
SSTableReader s = cfs.getSSTables().iterator().next();
Set<SSTableReader> compacting = new HashSet<>();
compacting.add(s);
- cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter.overrideOpenInterval(1);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1);
- SSTableWriter w = getWriter(cfs, s.descriptor.directory);
- rewriter.switchWriter(w);
int keyCount = 0;
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);
++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1);
+ )
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while (scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@@ -765,16 -738,14 +758,15 @@@
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
- Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
+ Set<SSTableReader> sstables = Sets.newHashSet(s);
assertEquals(1, sstables.size());
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000);
boolean checked = false;
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+ try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, sstables, 0);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);
++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ )
{
- ISSTableScanner scanner = scanners.scanners.get(0);
- CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
while (scanner.hasNext())
{
@@@ -796,6 -770,55 +788,52 @@@
validateCFS(cfs);
}
+ /**
+ * emulates anticompaction - writing from one source sstable to two new sstables
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTwoWriters() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
- Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
++ Set<SSTableReader> sstables = Sets.newHashSet(s);
+ assertEquals(1, sstables.size());
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false);
- SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false);
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
++ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
++ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, false);
++ SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, false, false))
+ {
+ ISSTableScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while (scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next()));
+
+ if (writer.currentWriter().getFilePointer() < 15000000)
+ writer.append(row);
+ else
+ writer2.append(row);
+ }
+ for (int i = 0; i < 5000; i++)
+ {
+ DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i));
+ ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF);
+ assertTrue(cf != null);
+ }
+ }
- writer.abort();
- writer2.abort();
- cfs.getDataTracker().unmarkCompacting(sstables);
- cfs.truncateBlocking();
- SSTableDeletingTask.waitForDeletions();
++ truncateCF();
+ validateCFS(cfs);
+ }
+
+
private void validateKeys(Keyspace ks)
{
for (int i = 0; i < 100; i++)