You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/17 08:51:39 UTC
cassandra git commit: Make sure we don't give out positions from an
sstable beyond its first/last keys.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 ffa806733 -> d5e5f9800
Make sure we don't give out positions from an sstable beyond its first/last keys.
Patch by marcuse; reviewed by belliottsmith for CASSANDRA-8458
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5e5f980
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5e5f980
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5e5f980
Branch: refs/heads/cassandra-2.1
Commit: d5e5f980093c20b42b89fec7bef8e31808fd37f6
Parents: ffa8067
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 12 15:50:12 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 17 08:44:57 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableReader.java | 29 ++++++----
.../io/sstable/SSTableRewriterTest.java | 61 ++++++++++++++++++++
3 files changed, 81 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d95e02e..410d49a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
* Disable mmap on Windows (CASSANDRA-6993)
* Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
* Add auth support to cassandra-stress (CASSANDRA-7985)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a8188ba..bd20226 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1354,20 +1354,29 @@ public class SSTableReader extends SSTable
List<Pair<Long,Long>> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
{
- AbstractBounds<RowPosition> keyRange = range.toRowBounds();
- RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
- long left = idxLeft == null ? -1 : idxLeft.position;
- if (left == -1)
- // left is past the end of the file
+ assert !range.isWrapAround() || range.right.isMinimum();
+ // truncate the range so it at most covers the sstable
+ AbstractBounds<RowPosition> bounds = range.toRowBounds();
+ RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
+ RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
+
+ if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
continue;
- RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
- long right = idxRight == null ? -1 : idxRight.position;
- if (right == -1 || Range.isWrapAround(range.left, range.right))
- // right is past the end of the file, or it wraps
- right = uncompressedLength();
+
+ long left = getPosition(leftBound, Operator.GT).position;
+ long right = (rightBound.compareTo(last) > 0)
+ ? (openReason == OpenReason.EARLY
+ // if opened early, we overlap with the old sstables by one key, so we know that the last
+ // (and further) key(s) will be streamed from these if necessary
+ ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position
+ : uncompressedLength())
+ : getPosition(rightBound, Operator.GT).position;
+
if (left == right)
// empty range
continue;
+
+ assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
positions.add(Pair.create(left, right));
}
return positions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index ecf97c3..6f9acea 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -40,10 +41,13 @@ import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.LazilyCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -121,6 +125,63 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
+ public void getPositionsTest() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ boolean checked = false;
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ {
+ ISSTableScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while (scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+ writer.append(row);
+ if (!checked && writer.currentWriter().getFilePointer() > 15000000)
+ {
+ checked = true;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (sstable.openReason == SSTableReader.OpenReason.EARLY)
+ {
+ SSTableReader c = sstables.iterator().next();
+ long lastKeySize = sstable.getPosition(sstable.last, SSTableReader.Operator.GT).position - sstable.getPosition(sstable.last, SSTableReader.Operator.EQ).position;
+ Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken()));
+ List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
+ List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
+ assertEquals(1, tmplinkPositions.size());
+ assertEquals(1, compactingPositions.size());
+ assertEquals(0, tmplinkPositions.get(0).left.longValue());
+ // make sure we have one key overlap between the early opened file and the compacting one:
+ assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left + lastKeySize);
+ assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue());
+ }
+ }
+ }
+ }
+ }
+ assertTrue(checked);
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+ Thread.sleep(100);
+ validateCFS(cfs);
+ int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+ assertEquals(1, filecounts);
+ cfs.truncateBlocking();
+ validateCFS(cfs);
+ }
+
+ @Test
public void testFileRemoval() throws InterruptedException
{
Keyspace keyspace = Keyspace.open(KEYSPACE);