You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/03 17:03:37 UTC
[1/5] git commit: Make sure unfinished compaction files are removed.
Repository: cassandra
Updated Branches:
refs/heads/trunk e60a06cc8 -> 0f59629ce
Make sure unfinished compaction files are removed.
Patch by marcuse; reviewed by yukim for CASSANDRA-8124
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c316e78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c316e78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c316e78
Branch: refs/heads/trunk
Commit: 9c316e7858f6dbf9df892aff78431044aa104ed9
Parents: d230922
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:17:01 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableReader.java | 6 +
.../cassandra/io/sstable/SSTableRewriter.java | 90 +++-
.../io/sstable/SSTableRewriterTest.java | 473 +++++++++++++++++++
4 files changed, 555 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 494fb93..681d616 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.2
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
* Fix shutdown when run as Windows service (CASSANDRA-8136)
* Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
* Fix race in RecoveryManagerTest (CASSANDRA-8176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 872f7df..40e708d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1599,6 +1599,12 @@ public class SSTableReader extends SSTable
}
}
+ @VisibleForTesting
+ int referenceCount()
+ {
+ return references.get();
+ }
+
/**
* Release reference to this SSTableReader.
* If there is no one referring to this SSTable, and is marked as compacted,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 76677ac..2c9fe7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -25,8 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -55,8 +58,7 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class SSTableRewriter
{
-
- private static final long preemptiveOpenInterval;
+ private static long preemptiveOpenInterval;
static
{
long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
@@ -65,6 +67,14 @@ public class SSTableRewriter
preemptiveOpenInterval = interval;
}
+ private boolean isFinished = false;
+
+ @VisibleForTesting
+ static void overrideOpenInterval(long size)
+ {
+ preemptiveOpenInterval = size;
+ }
+
private final DataTracker dataTracker;
private final ColumnFamilyStore cfs;
@@ -77,6 +87,8 @@ public class SSTableRewriter
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+ private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
+ private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
private final OperationType rewriteType; // the type of rewrite/compaction being performed
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
@@ -187,20 +199,32 @@ public class SSTableRewriter
{
if (writer == null)
return;
+
+ switchWriter(null);
+
moveStarts(null, Functions.forMap(originalStarts), true);
- List<SSTableReader> close = new ArrayList<>(finished);
+
+ List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
if (currentlyOpenedEarly != null)
close.add(currentlyOpenedEarly);
+
+ for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+ {
+ // we should close the bloom filter if we have not opened an sstable reader from this
+ // writer (it will get closed when we release the sstable reference below):
+ w.left.abort(w.right == null);
+ }
+
// also remove already completed SSTables
for (SSTableReader sstable : close)
sstable.markObsolete();
+
// releases reference in replaceReaders
if (!isOffline)
{
dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
dataTracker.unmarkCompacting(close);
}
- writer.abort(currentlyOpenedEarly == null);
}
/**
@@ -208,6 +232,11 @@ public class SSTableRewriter
* needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
* is true, we are instead restoring the starts of the readers from before the rewriting began
*
+ * note that we replace an existing sstable with a new *instance* of the same sstable, the replacement
+ * sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old
+ * one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old
+ * instance, we would get exceptions.
+ *
* @param newReader the rewritten reader that replaces them for this region
* @param newStarts a function mapping a reader's descriptor to their new start value
* @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
@@ -284,11 +313,15 @@ public class SSTableRewriter
writer = newWriter;
return;
}
- // tmp = false because later we want to query it with descriptor from SSTableReader
- SSTableReader reader = writer.closeAndOpenReader(maxAge);
- finished.add(reader);
- replaceReader(currentlyOpenedEarly, reader, false);
- moveStarts(reader, Functions.constant(reader.last), false);
+ // we leave it as a tmp file, but we open it early and add it to the dataTracker
+ SSTableReader reader = writer.openEarly(maxAge);
+ if (reader != null)
+ {
+ finishedOpenedEarly.add(reader);
+ replaceReader(currentlyOpenedEarly, reader, false);
+ moveStarts(reader, Functions.constant(reader.last), false);
+ }
+ finishedWriters.add(Pair.create(writer, reader));
currentlyOpenedEarly = null;
currentlyOpenedEarlyAt = 0;
writer = newWriter;
@@ -306,23 +339,48 @@ public class SSTableRewriter
{
finish(cleanupOldReaders, -1);
}
+
+ /**
+ * Finishes the new file(s)
+ *
+ * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
+ * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
+ * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ *
+ * @param cleanupOldReaders if we should replace the old files with the new ones
+ * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
+ * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
+ * repair time.
+ */
public void finish(boolean cleanupOldReaders, long repairedAt)
{
if (writer.getFilePointer() > 0)
{
- SSTableReader reader = repairedAt < 0 ?
- writer.closeAndOpenReader(maxAge) :
- writer.closeAndOpenReader(maxAge, repairedAt);
+ SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
finished.add(reader);
replaceReader(currentlyOpenedEarly, reader, false);
moveStarts(reader, Functions.constant(reader.last), false);
}
else
{
- writer.abort();
- writer = null;
+ writer.abort(true);
+ }
+ // make real sstables of the written ones:
+ for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+ {
+ if (w.left.getFilePointer() > 0)
+ {
+ SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
+ finished.add(newReader);
+ // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
+ replaceReader(w.right, newReader, false);
+ }
+ else
+ {
+ assert w.right == null;
+ w.left.abort(true);
+ }
}
-
if (!isOffline)
{
dataTracker.unmarkCompacting(finished);
@@ -337,10 +395,12 @@ public class SSTableRewriter
reader.releaseReference();
}
}
+ isFinished = true;
}
public List<SSTableReader> finished()
{
+ assert isFinished;
return finished;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
new file mode 100644
index 0000000..8b203ac
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableRewriterTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "Keyspace1";
+ private static final String CF = "Standard1";
+ @Test
+ public void basicTest() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int j = 0; j < 100; j ++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE, key);
+ rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+ ICompactionScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while(scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+ writer.append(row);
+ }
+ writer.finish();
+
+ validateCFS(cfs);
+
+ }
+
+
+ @Test
+ public void testFileRemoval() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < 1000; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
+
+ for (int i = 0; i < 500; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s = writer.openEarly(1000);
+ assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s2 = writer.openEarly(1000);
+ assertTrue(s != s2);
+ assertFileCounts(dir.list(), 2, 3);
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 3);
+ writer.abort(false);
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testFileRemovalNoAbort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < 1000; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
+
+ for (int i = 0; i < 500; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s = writer.openEarly(1000);
+ //assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ writer.closeAndOpenReader();
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFiles() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ rewriter.finish();
+ assertEquals(files, rewriter.finished().size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ // tmplink and tmp files should be gone:
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ rewriter.finish(false);
+ assertEquals(files, rewriter.finished().size());
+ assertEquals(files + 1, cfs.getSSTables().size());
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFiles_abort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ rewriter.abort();
+ Thread.sleep(1000);
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+
+ }
+
+ @Test
+ public void testNumberOfFiles_abort2() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to abort when we have nothing written in the new file
+ rewriter.abort();
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to finish when we have nothing written in the new file
+ rewriter.finish();
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_truncate() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ rewriter.finish();
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ cfs.truncateBlocking();
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testSmallFiles() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 400);
+ DecoratedKey origFirst = s.first;
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(1000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+ {
+ assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+ assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ }
+ }
+ rewriter.finish();
+ assertEquals(files, rewriter.finished().size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count / 100; i++)
+ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
+ SSTableWriter writer = new SSTableWriter(filename,
+ 0,
+ 0,
+ cfs.metadata,
+ StorageService.getPartitioner(),
+ new MetadataCollector(cfs.metadata.comparator));
+
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.closeAndOpenReader();
+ }
+
+ private void validateCFS(ColumnFamilyStore cfs)
+ {
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.referenceCount());
+ }
+ assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+ }
+
+
+ private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ for (String f : files)
+ {
+ if (f.contains("-tmplink-"))
+ tmplinkcount++;
+ if (f.contains("-tmp-"))
+ tmpcount++;
+ }
+ assertEquals(expectedtmplinkCount, tmplinkcount);
+ assertEquals(expectedtmpCount, tmpcount);
+ }
+
+ private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+ {
+ String filename = cfs.getTempSSTablePath(directory);
+ return new SSTableWriter(filename,
+ 0,
+ 0,
+ cfs.metadata,
+ StorageService.getPartitioner(),
+ new MetadataCollector(cfs.metadata.comparator));
+ }
+}
[4/5] Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c3062f7,0000000..7d4b8f3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1881 -1,0 +1,1887 @@@
+package org.apache.cassandra.io.sstable.format;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.cache.InstrumentingCache;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.*;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
+
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ */
+public abstract class SSTableReader extends SSTable
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
+
+ private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
+ private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
+
+ public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ long ts1 = o1.getMaxTimestamp();
+ long ts2 = o2.getMaxTimestamp();
+ return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+ }
+ };
+
+ public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return o1.first.compareTo(o2.first);
+ }
+ };
+
+ public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+
+ /**
+ * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
+ * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
+ * later than maxDataAge.
+ *
+ * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
+ *
+ * When a new sstable is flushed, maxDataAge is set to the time of creation.
+ * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
+ *
+ * The age is in milliseconds since epoc and is local to this host.
+ */
+ public final long maxDataAge;
+
+ public enum OpenReason
+ {
+ NORMAL,
+ EARLY,
+ METADATA_CHANGE
+ }
+
+ public final OpenReason openReason;
+
+ // indexfile and datafile: might be null before a call to load()
+ protected SegmentedFile ifile;
+ protected SegmentedFile dfile;
+
+ protected IndexSummary indexSummary;
+ protected IFilter bf;
+
+ protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
+
+ protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
+
+ protected final AtomicInteger references = new AtomicInteger(1);
+ // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
+ // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
+ protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
+ protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
+
+ // not final since we need to be able to change level on a file.
+ protected volatile StatsMetadata sstableMetadata;
+
+ protected final AtomicLong keyCacheHit = new AtomicLong(0);
+ protected final AtomicLong keyCacheRequest = new AtomicLong(0);
+
+ /**
+ * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
+ * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
+ * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
+ * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
+ */
+ protected Object replaceLock = new Object();
+ protected SSTableReader replacedBy;
+ private SSTableReader replaces;
+ private SSTableDeletingTask deletingTask;
+ private Runnable runOnClose;
+
+ @VisibleForTesting
+ public RestorableMeter readMeter;
+ protected ScheduledFuture readMeterSyncFuture;
+
+ /**
+ * Calculate approximate key count.
+ * If cardinality estimator is available on all given sstables, then this method use them to estimate
+ * key count.
+ * If not, then this uses index summaries.
+ *
+ * @param sstables SSTables to calculate key count
+ * @return estimated key count
+ */
+ public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
+ {
+ long count = -1;
+
+ // check if cardinality estimator is available for all SSTables
+ boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.descriptor.version.hasNewStatsFile();
+ }
+ });
+
+ // if it is, load them to estimate key count
+ if (cardinalityAvailable)
+ {
+ boolean failed = false;
+ ICardinality cardinality = null;
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
+ if (cardinality == null)
+ cardinality = metadata.cardinalityEstimator;
+ else
+ cardinality = cardinality.merge(metadata.cardinalityEstimator);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Reading cardinality from Statistics.db failed.", e);
+ failed = true;
+ break;
+ }
+ catch (CardinalityMergeException e)
+ {
+ logger.warn("Cardinality merge failed.", e);
+ failed = true;
+ break;
+ }
+ }
+ if (cardinality != null && !failed)
+ count = cardinality.cardinality();
+ }
+
+ // if something went wrong above or cardinality is not available, calculate using index summary
+ if (count < 0)
+ {
+ for (SSTableReader sstable : sstables)
+ count += sstable.estimatedKeys();
+ }
+ return count;
+ }
+
+ public static SSTableReader open(Descriptor descriptor) throws IOException
+ {
+ CFMetaData metadata;
+ if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
+ {
+ int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
+ String parentName = descriptor.cfname.substring(0, i);
+ CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
+ ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
+ metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
+ }
+ else
+ {
+ metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
+ }
+ return open(descriptor, metadata);
+ }
+
+ public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
+ {
+ IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
+ ? new LocalPartitioner(metadata.getKeyValidator())
+ : StorageService.getPartitioner();
+ return open(desc, componentsFor(desc), metadata, p);
+ }
+
+ public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ return open(descriptor, components, metadata, partitioner, true);
+ }
+
+ public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+ {
+ return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
+ }
+
+ /**
+ * Open SSTable reader to be used in batch mode(such as sstableloader).
+ *
+ * @param descriptor
+ * @param components
+ * @param metadata
+ * @param partitioner
+ * @return opened SSTableReader
+ * @throws IOException
+ */
+ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
+ SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
+ statsMetadata, OpenReason.NORMAL);
+
+ // special implementation of load to use non-pooled SegmentedFile builders
+ SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+ SegmentedFile.Builder dbuilder = sstable.compression
+ ? new CompressedSegmentedFile.Builder(null)
+ : new BufferedSegmentedFile.Builder();
+ if (!sstable.loadSummary(ibuilder, dbuilder))
+ sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
+ sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+ sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
+ sstable.bf = FilterFactory.AlwaysPresent;
+
+ return sstable;
+ }
+
+ private static SSTableReader open(Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ boolean validate) throws IOException
+ {
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
+ SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
+ statsMetadata, OpenReason.NORMAL);
+
+ // load index and filter
+ long start = System.nanoTime();
+ sstable.load(validationMetadata);
+ logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+ if (validate)
+ sstable.validate();
+
+ if (sstable.getKeyCache() != null)
+ logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
+
+ return sstable;
+ }
+
+ public static void logOpenException(Descriptor descriptor, IOException e)
+ {
+ if (e instanceof FileNotFoundException)
+ logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
+ else
+ logger.error("Corrupt sstable {}; skipped", descriptor, e);
+ }
+
+ public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
+ {
+ final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
+
+ ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
+ for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ SSTableReader sstable;
+ try
+ {
+ sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
+ }
+ catch (IOException ex)
+ {
+ logger.error("Corrupt sstable {}; skipped", entry, ex);
+ return;
+ }
+ sstables.add(sstable);
+ }
+ };
+ executor.submit(runnable);
+ }
+
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(7, TimeUnit.DAYS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return sstables;
+
+ }
+
+ /**
+ * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
+ */
+ public static SSTableReader internalOpen(Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary isummary,
+ IFilter bf,
+ long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+
+ SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+
+ reader.bf = bf;
+ reader.ifile = ifile;
+ reader.dfile = dfile;
+ reader.indexSummary = isummary;
+
+ return reader;
+ }
+
+
+ private static SSTableReader internalOpen(final Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ Long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ Factory readerFactory = descriptor.getFormat().getReaderFactory();
+
+ return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ }
+
+ protected SSTableReader(final Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason)
+ {
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
+ this.maxDataAge = maxDataAge;
+ this.openReason = openReason;
+
+ this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+
+ deletingTask = new SSTableDeletingTask(this);
+
+ // Don't track read rates for tables in the system keyspace. Also don't track reads for special operations (like early open)
+ // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
+ if (Keyspace.SYSTEM_KS.equals(desc.ksname) || openReason != OpenReason.NORMAL)
+ {
+ readMeter = null;
+ readMeterSyncFuture = null;
+ return;
+ }
+
+ readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+ // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
+ readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
+ {
+ public void run()
+ {
+ if (!isCompacted.get())
+ {
+ meterSyncThrottle.acquire();
+ SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
+ }
+ }
+ }, 1, 5, TimeUnit.MINUTES);
+ }
+
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ sum += sstable.onDiskLength();
+ }
+ return sum;
+ }
+
+ private void tidy(boolean release)
+ {
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+
+ if (references.get() != 0)
+ {
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+ }
+
+ synchronized (replaceLock)
+ {
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
+
+ if (replacedBy != null)
+ {
+ closeBf = replacedBy.bf != bf;
+ closeSummary = replacedBy.indexSummary != indexSummary;
+ closeFiles = replacedBy.dfile != dfile;
+ // if the replacement sstablereader uses a different path, clean up our paths
+ deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
+ }
+
+ if (replaces != null)
+ {
+ closeBf &= replaces.bf != bf;
+ closeSummary &= replaces.indexSummary != indexSummary;
+ closeFiles &= replaces.dfile != dfile;
+ deleteFiles &= !dfile.path.equals(replaces.dfile.path);
+ }
+
+ boolean deleteAll = false;
+ if (release && isCompacted.get())
+ {
+ assert replacedBy == null;
+ if (replaces != null)
+ {
+ replaces.replacedBy = null;
+ replaces.deletingTask = deletingTask;
+ replaces.markObsolete();
+ }
+ else
+ {
+ deleteAll = true;
+ }
+ }
+ else
+ {
+ if (replaces != null)
+ replaces.replacedBy = replacedBy;
+ if (replacedBy != null)
+ replacedBy.replaces = replaces;
+ }
+
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
+ }
+ }
+
+ private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
+ {
+ if (references.get() != 0)
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ final OpOrder.Barrier barrier;
+ if (cfs != null)
+ {
+ barrier = cfs.readOrdering.newBarrier();
+ barrier.issue();
+ }
+ else
+ barrier = null;
+
+ StorageService.tasks.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (barrier != null)
+ barrier.await();
+ if (closeBf)
+ bf.close();
+ if (closeSummary)
+ indexSummary.close();
+ if (closeFiles)
+ {
+ ifile.cleanup();
+ dfile.cleanup();
+ }
+ if (runOnClose != null)
+ runOnClose.run();
+ if (deleteAll)
+ {
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+ deletingTask.run();
+ }
+ else if (deleteFiles)
+ {
+ FileUtils.deleteWithConfirm(new File(dfile.path));
+ FileUtils.deleteWithConfirm(new File(ifile.path));
+ }
+ }
+ });
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+ }
+
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
+
+ public String getFilename()
+ {
+ return dfile.path;
+ }
+
+ public String getIndexFilename()
+ {
+ return ifile.path;
+ }
+
+ public void setTrackedBy(DataTracker tracker)
+ {
+ deletingTask.setTracker(tracker);
+ // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
+ // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
+ // here when we know we're being wired into the rest of the server infrastructure.
+ keyCache = CacheService.instance.keyCache;
+ }
+
+ private void load(ValidationMetadata validation) throws IOException
+ {
+ if (metadata.getBloomFilterFpChance() == 1.0)
+ {
+ // bf is disabled.
+ load(false, true);
+ bf = FilterFactory.AlwaysPresent;
+ }
+ else if (!components.contains(Component.FILTER) || validation == null)
+ {
+ // bf is enabled, but filter component is missing.
+ load(true, true);
+ }
+ else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
+ {
+ // bf fp chance in sstable metadata and it has changed since compaction.
+ load(true, true);
+ }
+ else
+ {
+ // bf is enabled and fp chance matches the currently configured value.
+ load(false, true);
+ loadBloomFilter();
+ }
+ }
+
+ /**
+ * Load bloom filter from Filter.db file.
+ *
+ * @throws IOException
+ */
+ private void loadBloomFilter() throws IOException
+ {
+ DataInputStream stream = null;
+ try
+ {
+ stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
+ bf = FilterFactory.deserialize(stream, true);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(stream);
+ }
+ }
+
+ /**
+ * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
+ * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
+ * avoid persisting it to disk by setting this to false
+ */
+ private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
+ {
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+
+ boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+ if (recreateBloomFilter || !summaryLoaded)
+ buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
+ saveSummary(ibuilder, dbuilder);
+ }
+
+ /**
+ * Build index summary(and optionally bloom filter) by reading through Index.db file.
+ *
+ * @param recreateBloomFilter true if recreate bloom filter
+ * @param ibuilder
+ * @param dbuilder
+ * @param summaryLoaded true if index summary is already loaded and not need to build again
+ * @throws IOException
+ */
+ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+
+ try
+ {
+ long indexSize = primaryIndex.length();
+ long histogramCount = sstableMetadata.estimatedRowSize.count();
+ long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
+ ? histogramCount
+ : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+
+ if (recreateBloomFilter)
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
+
+ IndexSummaryBuilder summaryBuilder = null;
+ if (!summaryLoaded)
+ summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
+
+ long indexPosition;
+ RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
+
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
+ DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ if (first == null)
+ first = decoratedKey;
+ last = decoratedKey;
+
+ if (recreateBloomFilter)
+ bf.add(decoratedKey.getKey());
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
+ {
+ summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(indexEntry.position);
+ }
+ }
+
+ if (!summaryLoaded)
+ indexSummary = summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+
+ first = getMinimalKey(first);
+ last = getMinimalKey(last);
+ }
+
+ /**
+ * Load index summary from Summary.db file if it exists.
+ *
+ * if loaded index summary has different index interval from current value stored in schema,
+ * then Summary.db file will be deleted and this returns false to rebuild summary.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ * @return true if index summary is loaded successfully from Summary.db file.
+ */
+ public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ return false;
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(new FileInputStream(summariesFile));
+ indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
+ first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+ // corrupted; delete it and fall back to creating a new summary
+ FileUtils.closeQuietly(iStream);
+ // delete it and fall back to creating a new summary
+ FileUtils.deleteWithConfirm(summariesFile);
+ return false;
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+
+ return true;
+ }
+
+ /**
+ * Save index summary to Summary.db file.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ */
+ public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ saveSummary(ibuilder, dbuilder, indexSummary);
+ }
+
+ private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+
+ DataOutputStreamAndChannel oStream = null;
+ try
+ {
+ oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+ IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
+ ByteBufferUtil.writeWithLength(first.getKey(), oStream);
+ ByteBufferUtil.writeWithLength(last.getKey(), oStream);
+ ibuilder.serializeBounds(oStream);
+ dbuilder.serializeBounds(oStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot save SSTable Summary: ", e);
+
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ FileUtils.deleteWithConfirm(summariesFile);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(oStream);
+ }
+ }
+
+ public void setReplacedBy(SSTableReader replacement)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+ replacedBy = replacement;
+ replacement.replaces = this;
+ replacement.replaceLock = replaceLock;
+ }
+ }
+
+ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ if (newStart.compareTo(this.first) > 0)
+ {
+ if (newStart.compareTo(this.last) > 0)
+ {
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, 0);
+ CLibrary.trySkipCache(ifile.path, 0, 0);
+ runOnClose.run();
+ }
+ };
+ }
+ else
+ {
+ final long dataStart = getPosition(newStart, Operator.GE).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, dataStart);
+ CLibrary.trySkipCache(ifile.path, 0, indexStart);
+ runOnClose.run();
+ }
+ };
+ }
+ }
+
+ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
+ openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
+ replacement.readMeterSyncFuture = this.readMeterSyncFuture;
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
+ /**
+ * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
+ * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
+ * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
+ * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
+ * @return a new SSTableReader
+ * @throws IOException
+ */
+ public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ int minIndexInterval = metadata.getMinIndexInterval();
+ int maxIndexInterval = metadata.getMaxIndexInterval();
+ double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+
+ IndexSummary newSummary;
+ long oldSize = bytesOnDisk();
+
+ // We have to rebuild the summary from the on-disk primary index in three cases:
+ // 1. The sampling level went up, so we need to read more entries off disk
+ // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+ // at full sampling (and consequently at any other sampling level)
+ // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+ if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+ {
+ newSummary = buildSummaryAtLevel(samplingLevel);
+ }
+ else if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ // we can use the existing index summary to make a smaller one
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+ "no adjustments to min/max_index_interval");
+ }
+
+ long newSize = bytesOnDisk();
+ StorageMetrics.load.inc(newSize - oldSize);
+ parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
+
+ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
+ openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
+ replacement.readMeterSyncFuture = this.readMeterSyncFuture;
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.first;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
+ private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ try
+ {
+ long indexSize = primaryIndex.length();
+ IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
+
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ RowIndexEntry.Serializer.skip(primaryIndex);
+ }
+
+ return summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+ }
+
+ public int getIndexSummarySamplingLevel()
+ {
+ return indexSummary.getSamplingLevel();
+ }
+
+ public long getIndexSummaryOffHeapSize()
+ {
+ return indexSummary.getOffHeapSize();
+ }
+
+ public int getMinIndexInterval()
+ {
+ return indexSummary.getMinIndexInterval();
+ }
+
+ public double getEffectiveIndexInterval()
+ {
+ return indexSummary.getEffectiveIndexInterval();
+ }
+
+ public void releaseSummary() throws IOException
+ {
+ indexSummary.close();
+ indexSummary = null;
+ }
+
+ private void validate()
+ {
+ if (this.first.compareTo(this.last) > 0)
+ throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+ }
+
+ /**
+ * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
+ * modulo downsampling of the index summary).
+ */
+ public long getIndexScanPosition(RowPosition key)
+ {
+ return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
+ }
+
+ protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+ {
+ if (binarySearchResult == -1)
+ return -1;
+ else
+ return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
+ }
+
+ protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
+ {
+ if (binarySearchResult < 0)
+ {
+ // binary search gives us the first index _greater_ than the key searched for,
+ // i.e., its insertion position
+ int greaterThan = (binarySearchResult + 1) * -1;
+ if (greaterThan == 0)
+ return -1;
+ return greaterThan - 1;
+ }
+ else
+ {
+ return binarySearchResult;
+ }
+ }
+
+ /**
+ * Returns the compression metadata for this sstable.
+ * @throws IllegalStateException if the sstable is not compressed
+ */
+ public CompressionMetadata getCompressionMetadata()
+ {
+ if (!compression)
+ throw new IllegalStateException(this + " is not compressed");
+
+ CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
+
+ //We need the parent cf metadata
+ String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+ cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
+
+ return cmd;
+ }
+
+ /**
+ * For testing purposes only.
+ */
+ public void forceFilterFailures()
+ {
+ bf = FilterFactory.AlwaysPresent;
+ }
+
+ public IFilter getBloomFilter()
+ {
+ return bf;
+ }
+
+ public long getBloomFilterSerializedSize()
+ {
+ return bf.serializedSize();
+ }
+
+ /**
+ * @return An estimate of the number of keys in this SSTable based on the index summary.
+ */
+ public long estimatedKeys()
+ {
+ return indexSummary.getEstimatedKeyCount();
+ }
+
+ /**
+ * @param ranges
+ * @return An estimate of the number of keys for given ranges in this SSTable.
+ */
+ public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
+ {
+ long sampleKeyCount = 0;
+ List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
+ for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
+ sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
+
+ // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
+ long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
+ return Math.max(1, estimatedKeys);
+ }
+
+ /**
+ * Returns the number of entries in the IndexSummary. At full sampling, this is approximately 1/INDEX_INTERVALth of
+ * the keys in this SSTable.
+ */
+ public int getIndexSummarySize()
+ {
+ return indexSummary.size();
+ }
+
+ /**
+ * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
+ */
+ public int getMaxIndexSummarySize()
+ {
+ return indexSummary.getMaxNumberOfEntries();
+ }
+
+ /**
+ * Returns the key for the index summary entry at `index`.
+ */
+ public byte[] getIndexSummaryKey(int index)
+ {
+ return indexSummary.getKey(index);
+ }
+
+ private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
+ {
+ // use the index to determine a minimal section for each range
+ List<Pair<Integer,Integer>> positions = new ArrayList<>();
+
+ for (Range<Token> range : Range.normalize(ranges))
+ {
+ RowPosition leftPosition = range.left.maxKeyBound();
+ RowPosition rightPosition = range.right.maxKeyBound();
+
+ int left = summary.binarySearch(leftPosition);
+ if (left < 0)
+ left = (left + 1) * -1;
+ else
+ // left range are start exclusive
+ left = left + 1;
+ if (left == summary.size())
+ // left is past the end of the sampling
+ continue;
+
+ int right = Range.isWrapAround(range.left, range.right)
+ ? summary.size() - 1
+ : summary.binarySearch(rightPosition);
+ if (right < 0)
+ {
+ // range are end inclusive so we use the previous index from what binarySearch give us
+ // since that will be the last index we will return
+ right = (right + 1) * -1;
+ if (right == 0)
+ // Means the first key is already stricly greater that the right bound
+ continue;
+ right--;
+ }
+
+ if (left > right)
+ // empty range
+ continue;
+ positions.add(Pair.create(left, right));
+ }
+ return positions;
+ }
+
+ public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
+ {
+ final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
+
+ if (indexRanges.isEmpty())
+ return Collections.emptyList();
+
+ return new Iterable<DecoratedKey>()
+ {
+ public Iterator<DecoratedKey> iterator()
+ {
+ return new Iterator<DecoratedKey>()
+ {
+ private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
+ private Pair<Integer, Integer> current;
+ private int idx;
+
+ public boolean hasNext()
+ {
+ if (current == null || idx > current.right)
+ {
+ if (rangeIter.hasNext())
+ {
+ current = rangeIter.next();
+ idx = current.left;
+ return true;
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ public DecoratedKey next()
+ {
+ byte[] bytes = indexSummary.getKey(idx++);
+ return partitioner.decorateKey(ByteBuffer.wrap(bytes));
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
+ * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
+ */
+ public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
+ {
+ // use the index to determine a minimal section for each range
+ List<Pair<Long,Long>> positions = new ArrayList<>();
+ for (Range<Token> range : Range.normalize(ranges))
+ {
+ AbstractBounds<RowPosition> keyRange = range.toRowBounds();
+ RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
+ long left = idxLeft == null ? -1 : idxLeft.position;
+ if (left == -1)
+ // left is past the end of the file
+ continue;
+ RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
+ long right = idxRight == null ? -1 : idxRight.position;
+ if (right == -1 || Range.isWrapAround(range.left, range.right))
+ // right is past the end of the file, or it wraps
+ right = uncompressedLength();
+ if (left == right)
+ // empty range
+ continue;
+ positions.add(Pair.create(left, right));
+ }
+ return positions;
+ }
+
+ public void invalidateCacheKey(DecoratedKey key)
+ {
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+ keyCache.remove(cacheKey);
+ }
+
+ public void cacheKey(DecoratedKey key, RowIndexEntry info)
+ {
+ CachingOptions caching = metadata.getCaching();
+
+ if (!caching.keyCache.isEnabled()
+ || keyCache == null
+ || keyCache.getCapacity() == 0)
+ {
+ return;
+ }
+
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+ logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
+ keyCache.put(cacheKey, info);
+ }
+
+ public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
+ {
+ return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
+ }
+
+ protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
+ {
+ if (keyCache != null && keyCache.getCapacity() > 0) {
+ if (updateStats)
+ {
+ RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
+ keyCacheRequest.incrementAndGet();
+ if (cachedEntry != null)
+ keyCacheHit.incrementAndGet();
+ return cachedEntry;
+ }
+ else
+ {
+ return keyCache.getInternal(unifiedKey);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get position updating key cache and stats.
+ * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
+ */
+ public RowIndexEntry getPosition(RowPosition key, Operator op)
+ {
+ return getPosition(key, op, true);
+ }
+
+ /**
+ * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
+ * allow key selection by token bounds but only if op != * EQ
+ * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
+ * @param updateCacheAndStats true if updating stats and cache
+ * @return The index entry corresponding to the key, or null if the key is not present
+ */
+ public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
+
+ //Corresponds to a name column
+ public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
+ public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
+
+ //Corresponds to a slice query
+ public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
+ public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
+
+ /**
+ * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
+ */
+ public DecoratedKey firstKeyBeyond(RowPosition token)
+ {
+ long sampledPosition = getIndexScanPosition(token);
+ if (sampledPosition == -1)
+ sampledPosition = 0;
+
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+ while (segments.hasNext())
+ {
+ FileDataInput in = segments.next();
+ try
+ {
+ while (!in.isEOF())
+ {
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ if (indexDecoratedKey.compareTo(token) > 0)
+ return indexDecoratedKey;
+
+ RowIndexEntry.Serializer.skip(in);
+ }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, in.getPath());
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @return The length in bytes of the data for this SSTable. For
+ * compressed files, this is not the same thing as the on disk size (see
+ * onDiskLength())
+ */
+ public long uncompressedLength()
+ {
+ return dfile.length;
+ }
+
+ /**
+ * @return The length in bytes of the on disk size for this SSTable. For
+ * compressed files, this is not the same thing as the data length (see
+ * length())
+ */
+ public long onDiskLength()
+ {
+ return dfile.onDiskLength;
+ }
+
+ public boolean acquireReference()
+ {
+ while (true)
+ {
+ int n = references.get();
+ if (n <= 0)
+ return false;
+ if (references.compareAndSet(n, n + 1))
+ return true;
+ }
+ }
+
++ @VisibleForTesting
++ public int referenceCount()
++ {
++ return references.get();
++ }
++
+ /**
+ * Release reference to this SSTableReader.
+ * If there is no one referring to this SSTable, and is marked as compacted,
+ * all resources are cleaned up and files are deleted eventually.
+ */
+ public void releaseReference()
+ {
+ if (references.decrementAndGet() == 0)
+ tidy(true);
+ assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
+ }
+
+ /**
+ * Mark the sstable as obsolete, i.e., compacted into newer sstables.
+ *
+ * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
+ * except for threads holding a reference.
+ *
+ * @return true if the this is the first time the file was marked obsolete. Calling this
+ * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
+ */
+ public boolean markObsolete()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Marking {} compacted", getFilename());
+
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+ }
+ return !isCompacted.getAndSet(true);
+ }
+
+ public boolean isMarkedCompacted()
+ {
+ return isCompacted.get();
+ }
+
+ public void markSuspect()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
+
+ isSuspect.getAndSet(true);
+ }
+
+ public boolean isMarkedSuspect()
+ {
+ return isSuspect.get();
+ }
+
+
+ /**
+ * I/O SSTableScanner
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ICompactionScanner getScanner()
+ {
+ return getScanner((RateLimiter) null);
+ }
+
+ public ICompactionScanner getScanner(RateLimiter limiter)
+ {
+ return getScanner(DataRange.allData(partitioner), limiter);
+ }
+
+ /**
+ *
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ICompactionScanner getScanner(DataRange dataRange)
+ {
+ return getScanner(dataRange, null);
+ }
+
+ /**
+ * Direct I/O SSTableScanner over a defined range of tokens.
+ *
+ * @param range the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
+ {
+ if (range == null)
+ return getScanner(limiter);
+ return getScanner(Collections.singletonList(range), limiter);
+ }
+
+ /**
+ * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+ *
+ * @param ranges the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public abstract ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
+
+ /**
+ *
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public abstract ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter);
+
+
+
+ public FileDataInput getFileDataInput(long position)
+ {
+ return dfile.getSegment(position);
+ }
+
+ /**
+ * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
+ * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
+ * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
+ * @return True iff this sstable contains data that's newer than the given age parameter.
+ */
+ public boolean newSince(long age)
+ {
+ return maxDataAge > age;
+ }
+
+ public void createLinks(String snapshotDirectoryPath)
+ {
+ for (Component component : components)
+ {
+ File sourceFile = new File(descriptor.filenameFor(component));
+ File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+ FileUtils.createHardLink(sourceFile, targetLink);
+ }
+ }
+
+ public boolean isRepaired()
+ {
+ return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+ }
+
+ public SSTableReader getCurrentReplacement()
+ {
+ synchronized (replaceLock)
+ {
+ SSTableReader cur = this, next = replacedBy;
+ while (next != null)
+ {
+ cur = next;
+ next = next.replacedBy;
+ }
+ return cur;
+ }
+ }
+
+ /**
+ * TODO: Move someplace reusable
+ */
+ public abstract static class Operator
+ {
+ public static final Operator EQ = new Equals();
+ public static final Operator GE = new GreaterThanOrEqualTo();
+ public static final Operator GT = new GreaterThan();
+
+ /**
+ * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
+ * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
+ */
+ public abstract int apply(int comparison);
+
+ final static class Equals extends Operator
+ {
+ public int apply(int comparison) { return -comparison; }
+ }
+
+ final static class GreaterThanOrEqualTo extends Operator
+ {
+ public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
+ }
+
+ final static class GreaterThan extends Operator
+ {
+ public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
+ }
+ }
+
+ public long getBloomFilterFalsePositiveCount()
+ {
+ return bloomFilterTracker.getFalsePositiveCount();
+ }
+
+ public long getRecentBloomFilterFalsePositiveCount()
+ {
+ return bloomFilterTracker.getRecentFalsePositiveCount();
+ }
+
+ public long getBloomFilterTruePositiveCount()
+ {
+ return bloomFilterTracker.getTruePositiveCount();
+ }
+
+ public long getRecentBloomFilterTruePositiveCount()
+ {
+ return bloomFilterTracker.getRecentTruePositiveCount();
+ }
+
+ public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
+ {
+ return keyCache;
+ }
+
+ public EstimatedHistogram getEstimatedRowSize()
+ {
+ return sstableMetadata.estimatedRowSize;
+ }
+
+ public EstimatedHistogram getEstimatedColumnCount()
+ {
+ return sstableMetadata.estimatedColumnCount;
+ }
+
+ public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+ {
+ return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
+ }
+
+ public double getDroppableTombstonesBefore(int gcBefore)
+ {
+ return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
+ }
+
+ public double getCompressionRatio()
+ {
+ return sstableMetadata.compressionRatio;
+ }
+
+ public ReplayPosition getReplayPosition()
+ {
+ return sstableMetadata.replayPosition;
+ }
+
+ public long getMinTimestamp()
+ {
+ return sstableMetadata.minTimestamp;
+ }
+
+ public long getMaxTimestamp()
+ {
+ return sstableMetadata.maxTimestamp;
+ }
+
+ public Set<Integer> getAncestors()
+ {
+ try
+ {
+ CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
+ return compactionMetadata.ancestors;
+ }
+ catch (IOException e)
+ {
+ SSTableReader.logOpenException(descriptor, e);
+ return Collections.emptySet();
+ }
+ }
+
+ public int getSSTableLevel()
+ {
+ return sstableMetadata.sstableLevel;
+ }
+
+ /**
+ * Reloads the sstable metadata from disk.
+ *
+ * Called after level is changed on sstable, for example if the sstable is dropped to L0
+ *
+ * Might be possible to remove in future versions
+ *
+ * @throws IOException
+ */
+ public void reloadSSTableMetadata() throws IOException
+ {
+ this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
+ }
+
+ public StatsMetadata getSSTableMetadata()
+ {
+ return sstableMetadata;
+ }
+
+ public RandomAccessReader openDataReader(RateLimiter limiter)
+ {
+ assert limiter != null;
+ return compression
+ ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
+ : ThrottledReader.open(new File(getFilename()), limiter);
+ }
+
+ public RandomAccessReader openDataReader()
+ {
+ return compression
+ ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
+ : RandomAccessReader.open(new File(getFilename()));
+ }
+
+ public RandomAccessReader openIndexReader()
+ {
+ return RandomAccessReader.open(new File(getIndexFilename()));
+ }
+
+ /**
+ * @param component component to get timestamp.
+ * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
+ */
+ public long getCreationTimeFor(Component component)
+ {
+ return new File(descriptor.filenameFor(component)).lastModified();
+ }
+
+ /**
+ * @return Number of key cache hit
+ */
+ public long getKeyCacheHit()
+ {
+ return keyCacheHit.get();
+ }
+
+ /**
+ * @return Number of key cache request
+ */
+ public long getKeyCacheRequest()
+ {
+ return keyCacheRequest.get();
+ }
+
+ /**
+ * @param sstables
+ * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false.
+ */
+ public static boolean acquireReferences(Iterable<SSTableReader> sstables)
+ {
+ SSTableReader failed = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (!sstable.acquireReference())
+ {
+ failed = sstable;
+ break;
+ }
+ }
+
+ if (failed == null)
+ return true;
+
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable == failed)
+ break;
+ sstable.releaseReference();
+ }
+ return false;
+ }
+
+ public static void releaseReferences(Iterable<SSTableReader> sstables)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ sstable.releaseReference();
+ }
+ }
+
+ private void dropPageCache()
+ {
+ dropPageCache(dfile.path);
+ dropPageCache(ifile.path);
+ }
+
+ private void dropPageCache(String filePath)
+ {
+ RandomAccessFile file = null;
+
+ try
+ {
+ file = new RandomAccessFile(filePath, "r");
+
+ int fd = CLibrary.getfd(file.getFD());
+
+ if (fd > 0)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Dropping page cache of file %s.", filePath));
+
+ CLibrary.trySkipCache(fd, 0, 0);
+ }
+ }
+ catch (IOException e)
+ {
+ // we don't care if cache cleanup fails
+ }
+ finally
+ {
+ FileUtils.closeQuietly(file);
+ }
+ }
+
+ /**
+ * Increment the total row read count and read rate for this SSTable. This should not be incremented for range
+ * slice queries, row cache hits, or non-query reads, like compaction.
+ */
+ public void incrementReadCount()
+ {
+ if (readMeter != null)
+ readMeter.mark();
+ }
+
+ protected class EmptyCompactionScanner implements ICompactionScanner
+ {
+ private final String filename;
+
+ public EmptyCompactionScanner(String filename)
+ {
+ this.filename = filename;
+ }
+
+ public long getLengthInBytes()
+ {
+ return 0;
+ }
+
+ public long getCurrentPosition()
+ {
+ return 0;
+ }
+
+ public String getBackingFiles()
+ {
+ return filename;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ return null;
+ }
+
+ public void close() throws IOException { }
+
+ public void remove() { }
+ }
+
+ public static class SizeComparator implements Comparator<SSTableReader>
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+ }
+ }
+
+ public static abstract class Factory
+ {
+ public abstract SSTableReader open(final Descriptor descriptor,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ Long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index e6e5d55,5ed4f4a..fffc310
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -29,17 -30,9 +30,18 @@@ import java.util.Collection
import java.util.List;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Test;
+
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
- import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
@@@ -47,13 -40,14 +49,15 @@@ import org.apache.cassandra.db.Mutation
import org.apache.cassandra.dht.BytesToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
+ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+
import org.junit.After;
+import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Iterables;
@@@ -134,36 -94,65 +138,72 @@@ public class AntiCompactionTes
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
+
+ @Test
+ public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.disableAutoCompaction();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long origSize = s.bytesOnDisk();
+ System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+ Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
- SSTableWriter writer = new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new MetadataCollector(cfs.metadata.comparator));
++ SSTableWriter writer = SSTableWriter.create(filename,0,0);
+
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.closeAndOpenReader();
+ }
- @Test
- public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
+ public void generateSStable(ColumnFamilyStore store, String Suffix)
{
- ColumnFamilyStore store = prepareColumnFamilyStore();
- Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
- assertEquals(store.getSSTables().size(), sstables.size());
- Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
- List<Range<Token>> ranges = Arrays.asList(range);
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ 0);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ }
- SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+ @Test
+ public void antiCompactTenSTC() throws InterruptedException, ExecutionException, IOException{
+ antiCompactTen("SizeTieredCompactionStrategy");
+ }
- assertThat(store.getSSTables().size(), is(1));
- assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+ @Test
+ public void antiCompactTenLC() throws InterruptedException, ExecutionException, IOException{
+ antiCompactTen("LeveledCompactionStrategy");
}
- private ColumnFamilyStore prepareColumnFamilyStore()
+ public void antiCompactTen(String compactionStrategy) throws InterruptedException, ExecutionException, IOException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
[2/5] git commit: Refactor how we track live size
Posted by ma...@apache.org.
Refactor how we track live size
Patch by marcuse; reviewed by yukim for CASSANDRA-7852
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5160c916
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5160c916
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5160c916
Branch: refs/heads/trunk
Commit: 5160c916c90886f69023ddba0078a624e5cf202d
Parents: 9c316e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:39:19 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 109 ++++++++++++-------
.../db/compaction/CompactionManager.java | 26 ++---
.../cassandra/db/compaction/CompactionTask.java | 7 +-
.../cassandra/db/compaction/Scrubber.java | 12 +-
.../cassandra/db/compaction/Upgrader.java | 31 +++---
.../io/sstable/IndexSummaryManager.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 90 ++++-----------
.../db/compaction/AntiCompactionTest.java | 48 +++++++-
.../io/sstable/IndexSummaryManagerTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 57 ++++++----
12 files changed, 219 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 681d616..32083cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.2
+ * Refactor how we track live size (CASSANDRA-7852)
* Make sure unfinished compaction files are removed (CASSANDRA-8124)
* Fix shutdown when run as Windows service (CASSANDRA-8136)
* Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7393323..7df2b75 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -254,33 +254,36 @@ public class DataTracker
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
- replace(sstables, Collections.<SSTableReader>emptyList());
+ removeSSTablesFromTracker(sstables);
+ releaseReferences(sstables, false);
notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
+ /**
+ *
+ * @param oldSSTables
+ * @param allReplacements
+ * @param compactionType
+ */
// note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
// that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
- public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
+ public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
{
- replace(sstables, Collections.<SSTableReader>emptyList());
- notifySSTablesChanged(sstables, allReplacements, compactionType);
- for (SSTableReader sstable : allReplacements)
- {
- long bytesOnDisk = sstable.bytesOnDisk();
- cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk);
- cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk);
- }
+ removeSSTablesFromTracker(oldSSTables);
+ releaseReferences(oldSSTables, false);
+ notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
+ addNewSSTablesSize(allReplacements);
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
{
- replace(Collections.<SSTableReader>emptyList(), sstables);
+ addSSTablesToTracker(sstables);
// no notifications or backup necessary
}
public void addSSTables(Collection<SSTableReader> sstables)
{
- replace(Collections.<SSTableReader>emptyList(), sstables);
+ addSSTablesToTracker(sstables);
for (SSTableReader sstable : sstables)
{
maybeIncrementallyBackup(sstable);
@@ -289,6 +292,32 @@ public class DataTracker
}
/**
+ * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
+ *
+ * @param toReplace
+ * @param replaceWith
+ */
+ public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ replaceReaders(toReplace, replaceWith, true);
+ }
+
+ /**
+ * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
+ *
+ * note that we dont track the live size of these sstables
+ * @param toReplace
+ * @param replaceWith
+ */
+ public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ for (SSTableReader s : toReplace)
+ assert s.openReason.equals(SSTableReader.OpenReason.EARLY);
+ // note that we can replace an early opened file with a real one
+ replaceReaders(toReplace, replaceWith, false);
+ }
+
+ /**
* removes all sstables that are not busy compacting.
*/
public void unreferenceSSTables()
@@ -310,7 +339,8 @@ public class DataTracker
return;
}
notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
- postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true);
+ removeOldSSTablesSize(notCompacting);
+ releaseReferences(notCompacting, true);
}
/**
@@ -344,11 +374,11 @@ public class DataTracker
void init()
{
view.set(new View(
- ImmutableList.of(new Memtable(cfstore)),
- ImmutableList.<Memtable>of(),
- Collections.<SSTableReader>emptySet(),
- Collections.<SSTableReader>emptySet(),
- SSTableIntervalTree.empty()));
+ ImmutableList.of(new Memtable(cfstore)),
+ ImmutableList.<Memtable>of(),
+ Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet(),
+ SSTableIntervalTree.empty()));
}
/**
@@ -358,7 +388,7 @@ public class DataTracker
* @param oldSSTables replaced readers
* @param newSSTables replacement readers
*/
- public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
+ private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
{
View currentView, newView;
do
@@ -369,7 +399,7 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
if (!oldSSTables.isEmpty() && notify)
- notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION);
+ notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
for (SSTableReader sstable : newSSTables)
sstable.setTrackedBy(this);
@@ -378,29 +408,28 @@ public class DataTracker
sstable.releaseReference();
}
- private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+ private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
{
- if (!cfstore.isValid())
- {
- removeOldSSTablesSize(replacements, false);
- replacements = Collections.emptyList();
- }
-
View currentView, newView;
do
{
currentView = view.get();
- newView = currentView.replace(oldSSTables, replacements);
+ newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
}
while (!view.compareAndSet(currentView, newView));
-
- postReplace(oldSSTables, replacements, false);
+ removeOldSSTablesSize(oldSSTables);
}
- private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted)
+ private void addSSTablesToTracker(Collection<SSTableReader> sstables)
{
- addNewSSTablesSize(replacements);
- removeOldSSTablesSize(oldSSTables, tolerateCompacted);
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
+ }
+ while (!view.compareAndSet(currentView, newView));
+ addNewSSTablesSize(sstables);
}
private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
@@ -418,7 +447,7 @@ public class DataTracker
}
}
- private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+ private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
{
for (SSTableReader sstable : oldSSTables)
{
@@ -428,13 +457,15 @@ public class DataTracker
long size = sstable.bytesOnDisk();
StorageMetrics.load.dec(size);
cfstore.metric.liveDiskSpaceUsed.dec(size);
+ }
+ }
- // tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing
- // compactions when it was invalidated, sstables may already be marked compacted, so we should
- // tolerate that (see CASSANDRA-5957)
+ private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+ {
+ for (SSTableReader sstable : oldSSTables)
+ {
boolean firstToCompact = sstable.markObsolete();
- assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted";
-
+ assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
sstable.releaseReference();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 51f45b8..84c3cb5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -688,8 +688,9 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
-
+ Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+ List<SSTableReader> finished;
try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
{
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -711,7 +712,8 @@ public class CompactionManager implements CompactionManagerMBean
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();
- writer.finish();
+ finished = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
}
catch (Throwable e)
{
@@ -724,17 +726,16 @@ public class CompactionManager implements CompactionManagerMBean
metrics.finishCompaction(ci);
}
- List<SSTableReader> results = writer.finished();
- if (!results.isEmpty())
+ if (!finished.isEmpty())
{
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = sstable.onDiskLength();
long endsize = 0;
- for (SSTableReader newSstable : results)
+ for (SSTableReader newSstable : finished)
endsize += newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
- logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ logger.info(String.format(format, finished.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
}
@@ -994,8 +995,8 @@ public class CompactionManager implements CompactionManagerMBean
sstableAsSet.add(sstable);
File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
@@ -1024,11 +1025,10 @@ public class CompactionManager implements CompactionManagerMBean
}
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
// so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
- unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
// add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- anticompactedSSTables.addAll(repairedSSTableWriter.finished());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d2ae04a..b442482 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -152,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
{
AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
Iterator<AbstractCompactedRow> iter = ci.iterator();
-
+ List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -161,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
if (collector != null)
collector.beginCompaction(ci);
long lastCheckObsoletion = start;
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
try
{
if (!iter.hasNext())
@@ -197,7 +197,7 @@ public class CompactionTask extends AbstractCompactionTask
}
// don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
+ newSStables = writer.finish();
}
catch (Throwable t)
{
@@ -217,7 +217,6 @@ public class CompactionTask extends AbstractCompactionTask
}
Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
if (!offline)
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b3d098d..0cd71f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -22,6 +22,7 @@ import java.io.*;
import java.util.*;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.*;
@@ -107,7 +108,8 @@ public class Scrubber implements Closeable
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
- SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
+ Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -256,9 +258,11 @@ public class Scrubber implements Closeable
}
// finish obsoletes the old sstable
- writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
- if (!writer.finished().isEmpty())
- newSstable = writer.finished().get(0);
+ List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+ if (!finished.isEmpty())
+ newSstable = finished.get(0);
+ if (!isOffline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index f102fef..39f668d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,7 +36,6 @@ public class Upgrader
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
- private final Set<SSTableReader> toUpgrade;
private final File directory;
private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -49,7 +49,6 @@ public class Upgrader
{
this.cfs = cfs;
this.sstable = sstable;
- this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
this.outputHandler = outputHandler;
this.directory = new File(sstable.getFilename()).getParentFile();
@@ -57,8 +56,8 @@ public class Upgrader
this.controller = new UpgradeController(cfs);
this.strategy = cfs.getCompactionStrategy();
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes());
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes());
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
@@ -68,27 +67,22 @@ public class Upgrader
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
- // -- note that we always only have one SSTable in toUpgrade here:
- for (SSTableReader sstable : toUpgrade)
+ sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+ for (Integer i : sstable.getAncestors())
{
- sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- {
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- sstableMetadataCollector.addAncestor(i);
- }
- sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+ if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+ sstableMetadataCollector.addAncestor(i);
}
-
+ sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
}
public void upgrade()
{
outputHandler.output("Upgrading " + sstable);
-
- SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+ Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -98,7 +92,8 @@ public class Upgrader
writer.append(row);
}
- writer.finish();
+ List<SSTableReader> sstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(toUpgrade, sstables, OperationType.UPGRADE_SSTABLES);
outputHandler.output("Upgrade of " + sstable + " complete.");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index cc60b4d..65b25a4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -416,7 +416,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
for (DataTracker tracker : replacedByTracker.keySet())
{
- tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker), true);
+ tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
newSSTables.addAll(replacementsByTracker.get(tracker));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2c9fe7e..4d5a06f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,7 +35,6 @@ import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -67,8 +65,6 @@ public class SSTableRewriter
preemptiveOpenInterval = interval;
}
- private boolean isFinished = false;
-
@VisibleForTesting
static void overrideOpenInterval(long size)
{
@@ -86,16 +82,14 @@ public class SSTableRewriter
private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
- private final OperationType rewriteType; // the type of rewrite/compaction being performed
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
{
this.rewriting = rewriting;
for (SSTableReader sstable : rewriting)
@@ -106,7 +100,6 @@ public class SSTableRewriter
this.dataTracker = cfs.getDataTracker();
this.cfs = cfs;
this.maxAge = maxAge;
- this.rewriteType = rewriteType;
this.isOffline = isOffline;
}
@@ -147,28 +140,18 @@ public class SSTableRewriter
// attempts to append the row, if fails resets the writer position
public RowIndexEntry tryAppend(AbstractCompactedRow row)
{
- mark();
+ writer.mark();
try
{
return append(row);
}
catch (Throwable t)
{
- resetAndTruncate();
+ writer.resetAndTruncate();
throw t;
}
}
- private void mark()
- {
- writer.mark();
- }
-
- private void resetAndTruncate()
- {
- writer.resetAndTruncate();
- }
-
private void maybeReopenEarly(DecoratedKey key)
{
if (FBUtilities.isUnix() && writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
@@ -186,7 +169,7 @@ public class SSTableRewriter
SSTableReader reader = writer.openEarly(maxAge);
if (reader != null)
{
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
currentlyOpenedEarly = reader;
currentlyOpenedEarlyAt = writer.getFilePointer();
moveStarts(reader, Functions.constant(reader.last), false);
@@ -222,7 +205,7 @@ public class SSTableRewriter
// releases reference in replaceReaders
if (!isOffline)
{
- dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
+ dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
dataTracker.unmarkCompacting(close);
}
}
@@ -276,12 +259,14 @@ public class SSTableRewriter
}));
}
}
- replaceReaders(toReplace, replaceWith, true);
+ cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
rewriting.removeAll(toReplace);
rewriting.addAll(replaceWith);
}
- private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith, boolean notify)
+
+
+ private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
{
if (isOffline)
return;
@@ -296,14 +281,7 @@ public class SSTableRewriter
dataTracker.markCompacting(Collections.singleton(replaceWith));
toReplaceSet = Collections.emptySet();
}
- replaceReaders(toReplaceSet, Collections.singleton(replaceWith), notify);
- }
-
- private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith, boolean notify)
- {
- if (isOffline)
- return;
- dataTracker.replaceReaders(toReplace, replaceWith, notify);
+ dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
}
public void switchWriter(SSTableWriter newWriter)
@@ -318,7 +296,7 @@ public class SSTableRewriter
if (reader != null)
{
finishedOpenedEarly.add(reader);
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
}
finishedWriters.add(Pair.create(writer, reader));
@@ -327,38 +305,34 @@ public class SSTableRewriter
writer = newWriter;
}
- public void finish()
- {
- finish(-1);
- }
- public void finish(long repairedAt)
- {
- finish(true, repairedAt);
- }
- public void finish(boolean cleanupOldReaders)
+ public List<SSTableReader> finish()
{
- finish(cleanupOldReaders, -1);
+ return finish(-1);
}
/**
* Finishes the new file(s)
*
- * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
- * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
- * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ * Creates final files, adds the new files to the dataTracker (via replaceReader).
+ *
+ * We add them to the tracker to be able to get rid of the tmpfiles
+ *
+ * It is up to the caller to do the compacted sstables replacement
+ * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ *
*
- * @param cleanupOldReaders if we should replace the old files with the new ones
* @param repairedAt the repair time, -1 if we should use the time we supplied when we created
* the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
* repair time.
*/
- public void finish(boolean cleanupOldReaders, long repairedAt)
+ public List<SSTableReader> finish(long repairedAt)
{
+ List<SSTableReader> finished = new ArrayList<>();
if (writer.getFilePointer() > 0)
{
SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
finished.add(reader);
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
}
else
@@ -373,7 +347,7 @@ public class SSTableRewriter
SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
finished.add(newReader);
// w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- replaceReader(w.right, newReader, false);
+ replaceEarlyOpenedFile(w.right, newReader);
}
else
{
@@ -384,23 +358,7 @@ public class SSTableRewriter
if (!isOffline)
{
dataTracker.unmarkCompacting(finished);
- if (cleanupOldReaders)
- dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
}
- else if (cleanupOldReaders)
- {
- for (SSTableReader reader : rewriting)
- {
- reader.markObsolete();
- reader.releaseReference();
- }
- }
- isFinished = true;
- }
-
- public List<SSTableReader> finished()
- {
- assert isFinished;
return finished;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6e1ac5f..5ed4f4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
@@ -41,6 +43,9 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.After;
import org.junit.Test;
@@ -89,7 +94,48 @@ public class AntiCompactionTest extends SchemaLoader
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
-
+ @Test
+ public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.disableAutoCompaction();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long origSize = s.bytesOnDisk();
+ System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+ Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
+ SSTableWriter writer = new SSTableWriter(filename,
+ 0,
+ 0,
+ cfs.metadata,
+ StorageService.getPartitioner(),
+ new MetadataCollector(cfs.metadata.comparator));
+
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.closeAndOpenReader();
+ }
+
@Test
public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index b621c45..0a2b5a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -428,7 +428,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
}
// don't leave replaced SSTRs around to break other tests
- cfs.getDataTracker().replaceReaders(Collections.singleton(original), Collections.singleton(sstable), true);
+ cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 7f85019..6f8ab62 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -412,7 +412,7 @@ public class SSTableReaderTest extends SchemaLoader
}
SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
- store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement), true);
+ store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
for (Future future : futures)
future.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8b203ac..4d248bd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.compaction.LazilyCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
@@ -66,7 +68,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.forceBlockingFlush();
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
ICompactionScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -76,7 +78,7 @@ public class SSTableRewriterTest extends SchemaLoader
AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
writer.append(row);
}
- writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
validateCFS(cfs);
@@ -142,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader
@Test
- public void testNumberOfFiles() throws InterruptedException
+ public void testNumberOfFilesAndSizes() throws InterruptedException
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -150,10 +152,10 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
-
+ long startStorageMetricsLoad = StorageMetrics.load.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -167,13 +169,23 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
files++;
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
}
}
- rewriter.finish();
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+ assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
// tmplink and tmp files should be gone:
+ assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
validateCFS(cfs);
}
@@ -190,7 +202,7 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -206,10 +218,10 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- rewriter.finish(false);
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ assertEquals(files, sstables.size());
assertEquals(files + 1, cfs.getSSTables().size());
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
@@ -226,11 +238,12 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
+ long startSize = cfs.metric.liveDiskSpaceUsed.count();
DecoratedKey origFirst = s.first;
DecoratedKey origLast = s.last;
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -248,6 +261,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
rewriter.abort();
Thread.sleep(1000);
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
assertEquals(1, cfs.getSSTables().size());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@ -270,7 +284,7 @@ public class SSTableRewriterTest extends SchemaLoader
DecoratedKey origLast = s.last;
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -313,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -331,7 +345,8 @@ public class SSTableRewriterTest extends SchemaLoader
if (files == 3)
{
//testing to finish when we have nothing written in the new file
- rewriter.finish();
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
break;
}
}
@@ -353,7 +368,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -369,7 +384,8 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- rewriter.finish();
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
cfs.truncateBlocking();
@@ -389,7 +405,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -406,8 +422,9 @@ public class SSTableRewriterTest extends SchemaLoader
files++;
}
}
- rewriter.finish();
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
[3/5] Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0000000,4d248bd..58803c3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -1,0 -1,490 +1,504 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.io.sstable;
+
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
-import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import com.google.common.collect.Sets;
++import org.junit.After;
++import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+ import org.apache.cassandra.db.compaction.CompactionController;
+ import org.apache.cassandra.db.compaction.ICompactionScanner;
+ import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+ import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
++import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+
+ public class SSTableRewriterTest extends SchemaLoader
+ {
- private static final String KEYSPACE = "Keyspace1";
++ private static final String KEYSPACE = "SSTableRewriterTest";
+ private static final String CF = "Standard1";
++
++ @BeforeClass
++ public static void defineSchema() throws ConfigurationException
++ {
++ SchemaLoader.prepareServer();
++ SchemaLoader.createKeyspace(KEYSPACE,
++ SimpleStrategy.class,
++ KSMetaData.optsWithRF(1),
++ SchemaLoader.standardCFMD(KEYSPACE, CF));
++ }
++
++ @After
++ public void truncateCF()
++ {
++ Keyspace keyspace = Keyspace.open(KEYSPACE);
++ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
++ store.truncateBlocking();
++ }
++
++
+ @Test
+ public void basicTest() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int j = 0; j < 100; j ++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE, key);
+ rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+ ICompactionScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while(scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+ writer.append(row);
+ }
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
+
+ validateCFS(cfs);
+
+ }
+
+
+ @Test
+ public void testFileRemoval() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- for (int i = 0; i < 1000; i++)
- cf.addColumn(Util.column(String.valueOf(i), "a", 1));
++ for (int i = 0; i < 100; i++)
++ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
-
+ for (int i = 0; i < 500; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s = writer.openEarly(1000);
+ assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s2 = writer.openEarly(1000);
+ assertTrue(s != s2);
+ assertFileCounts(dir.list(), 2, 3);
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 3);
+ writer.abort(false);
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testFileRemovalNoAbort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < 1000; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
+
+ for (int i = 0; i < 500; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ SSTableReader s = writer.openEarly(1000);
+ //assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ writer.closeAndOpenReader();
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFilesAndSizes() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long startStorageMetricsLoad = StorageMetrics.load.count();
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+ assertEquals(files, sstables.size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ // tmplink and tmp files should be gone:
+ assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ assertEquals(files, sstables.size());
+ assertEquals(files + 1, cfs.getSSTables().size());
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFiles_abort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long startSize = cfs.metric.liveDiskSpaceUsed.count();
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ rewriter.abort();
+ Thread.sleep(1000);
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+
+ }
+
+ @Test
+ public void testNumberOfFiles_abort2() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to abort when we have nothing written in the new file
+ rewriter.abort();
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to finish when we have nothing written in the new file
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_truncate() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ cfs.truncateBlocking();
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testSmallFiles() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 400);
+ DecoratedKey origFirst = s.first;
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(1000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+ {
+ assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+ assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ assertEquals(files, sstables.size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count / 100; i++)
+ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
- SSTableWriter writer = new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new MetadataCollector(cfs.metadata.comparator));
++ SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
+
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.closeAndOpenReader();
+ }
+
+ private void validateCFS(ColumnFamilyStore cfs)
+ {
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.referenceCount());
+ }
+ assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+ }
+
+
+ private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ for (String f : files)
+ {
- if (f.contains("-tmplink-"))
++ if (f.contains("tmplink-"))
+ tmplinkcount++;
- if (f.contains("-tmp-"))
++ if (f.contains("tmp-"))
+ tmpcount++;
+ }
+ assertEquals(expectedtmplinkCount, tmplinkcount);
+ assertEquals(expectedtmpCount, tmpcount);
+ }
+
+ private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+ {
+ String filename = cfs.getTempSSTablePath(directory);
- return new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new MetadataCollector(cfs.metadata.comparator));
++ return SSTableWriter.create(filename, 0, 0);
+ }
+ }
[5/5] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/db/compaction/Upgrader.java
src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f59629c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f59629c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f59629c
Branch: refs/heads/trunk
Commit: 0f59629ce280ba2a74d65a7719dde7cf79923f05
Parents: e60a06c 5160c91
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 3 17:02:10 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 17:02:10 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/db/DataTracker.java | 109 ++--
.../db/compaction/CompactionManager.java | 29 +-
.../cassandra/db/compaction/CompactionTask.java | 7 +-
.../cassandra/db/compaction/Scrubber.java | 12 +-
.../cassandra/db/compaction/Upgrader.java | 31 +-
.../io/sstable/IndexSummaryManager.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 160 +++---
.../io/sstable/format/SSTableReader.java | 6 +
.../db/compaction/AntiCompactionTest.java | 42 +-
.../io/sstable/IndexSummaryManagerTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 504 +++++++++++++++++++
13 files changed, 755 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3a8ada2,32083cc..9754110
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,6 +1,39 @@@
+3.0
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * improve concurrency of repair (CASSANDRA-6455)
+
+
2.1.2
+ * Refactor how we track live size (CASSANDRA-7852)
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
* Fix shutdown when run as Windows service (CASSANDRA-8136)
* Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
* Fix race in RecoveryManagerTest (CASSANDRA-8176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ee36cd,84c3cb5..cccb7f9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1045,76 -987,63 +1046,78 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
+
+
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
+ return 0;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+ File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
++ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
++ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- while(iter.hasNext())
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
+
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ while(iter.hasNext())
+ {
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
- }
- catch (Throwable e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
+ // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
- unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
++ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
++ anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
++ anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
++ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
++
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount + unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
- return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
++ return anticompactedSSTables.size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 584ff38,b442482..808626b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -158,9 -150,9 +158,9 @@@ public class CompactionTask extends Abs
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
Iterator<AbstractCompactedRow> iter = ci.iterator();
-
+ List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java
index c9e7034,39f668d..52739de
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@@ -21,8 -21,9 +21,9 @@@ import java.io.File
import java.util.*;
import com.google.common.base.Throwables;
+ import com.google.common.collect.Sets;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.*;
@@@ -69,29 -67,24 +68,24 @@@ public class Upgrade
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
- // -- note that we always only have one SSTable in toUpgrade here:
- for (SSTableReader sstable : toUpgrade)
+ sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+ for (Integer i : sstable.getAncestors())
{
- sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- {
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- sstableMetadataCollector.addAncestor(i);
- }
- sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+ if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+ sstableMetadataCollector.addAncestor(i);
}
-
+ sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
- return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+ return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
}
public void upgrade()
{
outputHandler.output("Upgrading " + sstable);
-
- SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+ Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
- Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 18825cb,4d5a06f..f3d08a6
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -34,11 -35,9 +35,11 @@@ import org.apache.cassandra.db.DataTrac
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
- import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb