You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/17 08:51:39 UTC

cassandra git commit: Make sure we don't give out positions from an sstable beyond its first/last keys.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 ffa806733 -> d5e5f9800


Make sure we don't give out positions from an sstable beyond its first/last keys.

Patch by marcuse; reviewed by belliottsmith for CASSANDRA-8458


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5e5f980
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5e5f980
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5e5f980

Branch: refs/heads/cassandra-2.1
Commit: d5e5f980093c20b42b89fec7bef8e31808fd37f6
Parents: ffa8067
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 12 15:50:12 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 17 08:44:57 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableReader.java     | 29 ++++++----
 .../io/sstable/SSTableRewriterTest.java         | 61 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d95e02e..410d49a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
  * Disable mmap on Windows (CASSANDRA-6993)
  * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
  * Add auth support to cassandra-stress (CASSANDRA-7985)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a8188ba..bd20226 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1354,20 +1354,29 @@ public class SSTableReader extends SSTable
         List<Pair<Long,Long>> positions = new ArrayList<>();
         for (Range<Token> range : Range.normalize(ranges))
         {
-            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
-            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
-            long left = idxLeft == null ? -1 : idxLeft.position;
-            if (left == -1)
-                // left is past the end of the file
+            assert !range.isWrapAround() || range.right.isMinimum();
+            // truncate the range so it at most covers the sstable
+            AbstractBounds<RowPosition> bounds = range.toRowBounds();
+            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
+            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
+
+            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
                 continue;
-            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
-            long right = idxRight == null ? -1 : idxRight.position;
-            if (right == -1 || Range.isWrapAround(range.left, range.right))
-                // right is past the end of the file, or it wraps
-                right = uncompressedLength();
+
+            long left = getPosition(leftBound, Operator.GT).position;
+            long right = (rightBound.compareTo(last) > 0)
+                         ? (openReason == OpenReason.EARLY
+                            // if opened early, we overlap with the old sstables by one key, so we know that the last
+                            // (and further) key(s) will be streamed from these if necessary
+                            ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position
+                            : uncompressedLength())
+                         : getPosition(rightBound, Operator.GT).position;
+
             if (left == right)
                 // empty range
                 continue;
+
+            assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
             positions.add(Pair.create(left, right));
         }
         return positions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index ecf97c3..6f9acea 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
@@ -40,10 +41,13 @@ import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -121,6 +125,63 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
+    public void getPositionsTest() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+        boolean checked = false;
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        {
+            ISSTableScanner scanner = scanners.scanners.get(0);
+            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            while (scanner.hasNext())
+            {
+                AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+                writer.append(row);
+                if (!checked && writer.currentWriter().getFilePointer() > 15000000)
+                {
+                    checked = true;
+                    for (SSTableReader sstable : cfs.getSSTables())
+                    {
+                        if (sstable.openReason == SSTableReader.OpenReason.EARLY)
+                        {
+                            SSTableReader c = sstables.iterator().next();
+                            long lastKeySize = sstable.getPosition(sstable.last, SSTableReader.Operator.GT).position - sstable.getPosition(sstable.last, SSTableReader.Operator.EQ).position;
+                            Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken()));
+                            List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
+                            List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
+                            assertEquals(1, tmplinkPositions.size());
+                            assertEquals(1, compactingPositions.size());
+                            assertEquals(0, tmplinkPositions.get(0).left.longValue());
+                            // make sure we have one key overlap between the early opened file and the compacting one:
+                            assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left + lastKeySize);
+                            assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue());
+                        }
+                    }
+                }
+            }
+        }
+        assertTrue(checked);
+        Collection<SSTableReader> newsstables = writer.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+        Thread.sleep(100);
+        validateCFS(cfs);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+        assertEquals(1, filecounts);
+        cfs.truncateBlocking();
+        validateCFS(cfs);
+    }
+
+    @Test
     public void testFileRemoval() throws InterruptedException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);