You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/02/12 00:55:13 UTC
[1/3] cassandra git commit: Always persist upsampled index summaries
Repository: cassandra
Updated Branches:
refs/heads/trunk bc8e878ec -> db49d3b89
Always persist upsampled index summaries
Patch by Ariel Weisberg; reviewed by Tyler Hobbs for CASSANDRA-10512
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5c83f49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5c83f49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5c83f49
Branch: refs/heads/trunk
Commit: d5c83f49148ad5f515b19364945260594dc3d27c
Parents: af6bd1b
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Thu Feb 11 17:53:02 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Thu Feb 11 17:53:02 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/sstable/format/SSTableReader.java | 18 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 10 +
.../apache/cassandra/io/util/SegmentedFile.java | 10 +
.../cassandra/io/sstable/SSTableReaderTest.java | 541 ----------------
.../io/sstable/format/SSTableReaderTest.java | 648 +++++++++++++++++++
6 files changed, 681 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a4717d..fa25980 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.6
+ * Always persist upsampled index summaries (CASSANDRA-10512)
* (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
* Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
* Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 27ac87c..e81e4e9 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1155,12 +1155,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
// we can use the existing index summary to make a smaller one
newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
-
- try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
- {
- saveSummary(ibuilder, dbuilder, newSummary);
- }
}
else
{
@@ -1168,6 +1162,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
"no adjustments to min/max_index_interval");
}
+ //Always save the resampled index
+ try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+ SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
+ {
+ for (long boundry : dfile.copyReadableBounds())
+ dbuilder.addPotentialBoundary(boundry);
+ for (long boundry : ifile.copyReadableBounds())
+ ibuilder.addPotentialBoundary(boundry);
+
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+
long newSize = bytesOnDisk();
StorageMetrics.load.inc(newSize - oldSize);
parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 01f8370..70ac77a 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -102,6 +102,16 @@ public class MmappedSegmentedFile extends SegmentedFile
return file;
}
+ @Override
+ public long[] copyReadableBounds()
+ {
+ long[] bounds = new long[segments.length + 1];
+ for (int i = 0; i < segments.length; i++)
+ bounds[i] = segments[i].left;
+ bounds[segments.length] = length;
+ return bounds;
+ }
+
private static final class Cleanup extends SegmentedFile.Cleanup
{
final Segment[] segments;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 553cc0d..cb331de 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -155,6 +155,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
}
/**
+ * Retrieve the readable bounds if any so they can be cloned into other files such
+ * as when downsampling an index summary. Readable bounds are in between record locations in a file
+ * that are good positions for mapping the file such that records don't cross mappings.
+ */
+ public long[] copyReadableBounds()
+ {
+ return new long[0];
+ }
+
+ /**
* Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
*/
public static abstract class Builder implements AutoCloseable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
deleted file mode 100644
index 682d999..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ /dev/null
@@ -1,541 +0,0 @@
-package org.apache.cassandra.io.sstable;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.google.common.collect.Sets;
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.MmappedSegmentedFile;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import static org.apache.cassandra.Util.cellname;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(OrderedJUnit4ClassRunner.class)
-public class SSTableReaderTest
-{
- public static final String KEYSPACE1 = "SSTableReaderTest";
- public static final String CF_STANDARD = "Standard1";
- public static final String CF_STANDARD2 = "Standard2";
- public static final String CF_INDEXED = "Indexed1";
- public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
-
- static Token t(int i)
- {
- return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
- }
-
- @BeforeClass
- public static void defineSchema() throws Exception
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
- SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEXED, true),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL)
- .minIndexInterval(8)
- .maxIndexInterval(256)
- .caching(CachingOptions.NONE));
- }
-
- @Test
- public void testGetPositionsForRanges()
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
-
- // insert data and compact to a single sstable
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 10; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- List<Range<Token>> ranges = new ArrayList<Range<Token>>();
- // 1 key
- ranges.add(new Range<Token>(t(0), t(1)));
- // 2 keys
- ranges.add(new Range<Token>(t(2), t(4)));
- // wrapping range from key to end
- ranges.add(new Range<Token>(t(6), StorageService.getPartitioner().getMinimumToken()));
- // empty range (should be ignored)
- ranges.add(new Range<Token>(t(9), t(91)));
-
- // confirm that positions increase continuously
- SSTableReader sstable = store.getSSTables().iterator().next();
- long previous = -1;
- for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges))
- {
- assert previous <= section.left : previous + " ! < " + section.left;
- assert section.left < section.right : section.left + " ! < " + section.right;
- previous = section.right;
- }
- }
-
- @Test
- public void testSpannedIndexPositions() throws IOException
- {
- MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
-
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
-
- // insert a bunch of data and compact to a single sstable
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 100; j += 2)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- // check that all our keys are found correctly
- SSTableReader sstable = store.getSSTables().iterator().next();
- for (int j = 0; j < 100; j += 2)
- {
- DecoratedKey dk = Util.dk(String.valueOf(j));
- FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position);
- DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
- assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath());
- }
-
- // check no false positives
- for (int j = 1; j < 110; j += 2)
- {
- DecoratedKey dk = Util.dk(String.valueOf(j));
- assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null;
- }
- }
-
- @Test
- public void testPersistentStatistics()
- {
-
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
-
- for (int j = 0; j < 100; j += 2)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
-
- clearAndLoad(store);
- assert store.metric.maxRowSize.getValue() != 0;
- }
-
- private void clearAndLoad(ColumnFamilyStore cfs)
- {
- cfs.clearUnsafe();
- cfs.loadNewSSTables();
- }
-
- @Test
- public void testReadRateTracking()
- {
- // try to make sure CASSANDRA-8239 never happens again
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
-
- for (int j = 0; j < 10; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.apply();
- }
- store.forceBlockingFlush();
-
- SSTableReader sstable = store.getSSTables().iterator().next();
- assertEquals(0, sstable.getReadMeter().count());
-
- DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4"));
- store.getColumnFamily(key, Composites.EMPTY, Composites.EMPTY, false, 100, 100);
- assertEquals(1, sstable.getReadMeter().count());
- store.getColumnFamily(key, cellname("0"), cellname("0"), false, 100, 100);
- assertEquals(2, sstable.getReadMeter().count());
- store.getColumnFamily(Util.namesQueryFilter(store, key, cellname("0")));
- assertEquals(3, sstable.getReadMeter().count());
- }
-
- @Test
- public void testGetPositionsForRangesWithKeyCache()
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
- CacheService.instance.keyCache.setCapacity(100);
-
- // insert data and compact to a single sstable
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 10; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- SSTableReader sstable = store.getSSTables().iterator().next();
- long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position;
- long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position;
- long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position;
- long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position;
-
- Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);
-
- // range are start exclusive so we should start at 3
- assert p.left == p3;
-
- // to capture 6 we have to stop at the start of 7
- assert p.right == p7;
- }
-
- @Test
- public void testPersistentStatisticsWithSecondaryIndex()
- {
- // Create secondary index and flush to disk
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis());
- rm.applyUnsafe();
- store.forceBlockingFlush();
-
- // check if opening and querying works
- assertIndexQueryWorks(store);
- }
- public void testGetPositionsKeyCacheStats()
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
- CacheService.instance.keyCache.setCapacity(1000);
-
- // insert data and compact to a single sstable
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 10; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation("Keyspace1", key);
- rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.apply();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- SSTableReader sstable = store.getSSTables().iterator().next();
- sstable.getPosition(k(2), SSTableReader.Operator.EQ);
- assertEquals(0, sstable.getKeyCacheHit());
- assertEquals(1, sstable.getBloomFilterTruePositiveCount());
- sstable.getPosition(k(2), SSTableReader.Operator.EQ);
- assertEquals(1, sstable.getKeyCacheHit());
- assertEquals(2, sstable.getBloomFilterTruePositiveCount());
- sstable.getPosition(k(15), SSTableReader.Operator.EQ);
- assertEquals(1, sstable.getKeyCacheHit());
- assertEquals(2, sstable.getBloomFilterTruePositiveCount());
-
- }
-
-
- @Test
- public void testOpeningSSTable() throws Exception
- {
- String ks = KEYSPACE1;
- String cf = "Standard1";
-
- // clear and create just one sstable for this test
- Keyspace keyspace = Keyspace.open(ks);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
- store.clearUnsafe();
- store.disableAutoCompaction();
-
- DecoratedKey firstKey = null, lastKey = null;
- long timestamp = System.currentTimeMillis();
- for (int i = 0; i < store.metadata.getMinIndexInterval(); i++)
- {
- DecoratedKey key = Util.dk(String.valueOf(i));
- if (firstKey == null)
- firstKey = key;
- if (lastKey == null)
- lastKey = key;
- if (store.metadata.getKeyValidator().compare(lastKey.getKey(), key.getKey()) < 0)
- lastKey = key;
- Mutation rm = new Mutation(ks, key.getKey());
- rm.add(cf, cellname("col"),
- ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
-
- SSTableReader sstable = store.getSSTables().iterator().next();
- Descriptor desc = sstable.descriptor;
-
- // test to see if sstable can be opened as expected
- SSTableReader target = SSTableReader.open(desc);
- Assert.assertEquals(target.getIndexSummarySize(), 1);
- Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0));
- assert target.first.equals(firstKey);
- assert target.last.equals(lastKey);
- target.selfRef().release();
- }
-
- @Test
- public void testLoadingSummaryUsesCorrectPartitioner() throws Exception
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis());
- rm.applyUnsafe();
- store.forceBlockingFlush();
-
- ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs();
- assert indexCfs.partitioner instanceof LocalPartitioner;
- SSTableReader sstable = indexCfs.getSSTables().iterator().next();
- assert sstable.first.getToken() instanceof LocalToken;
-
- try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), sstable.compression))
- {
- sstable.saveSummary(ibuilder, dbuilder);
- }
- SSTableReader reopened = SSTableReader.open(sstable.descriptor);
- assert reopened.first.getToken() instanceof LocalToken;
- reopened.selfRef().release();
- }
-
- /** see CASSANDRA-5407 */
- @Test
- public void testGetScannerForNoIntersectingRanges() throws Exception
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard1", cellname("xyz"), ByteBufferUtil.bytes("abc"), 0);
- rm.applyUnsafe();
- store.forceBlockingFlush();
- boolean foundScanner = false;
- for (SSTableReader s : store.getSSTables())
- {
- try (ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null))
- {
- scanner.next(); // throws exception pre 5407
- foundScanner = true;
- }
- }
- assertTrue(foundScanner);
- }
-
- @Test
- public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
-
- // insert data and compact to a single sstable. The
- // number of keys inserted is greater than index_interval
- // to ensure multiple segments in the index file
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 130; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- // construct a range which is present in the sstable, but whose
- // keys are not found in the first segment of the index.
- List<Range<Token>> ranges = new ArrayList<Range<Token>>();
- ranges.add(new Range<Token>(t(98), t(99)));
-
- SSTableReader sstable = store.getSSTables().iterator().next();
- List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
- assert sections.size() == 1 : "Expected to find range in sstable" ;
-
- // re-open the same sstable as it would be during bulk loading
- Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
- if (sstable.components.contains(Component.COMPRESSION_INFO))
- components.add(Component.COMPRESSION_INFO);
- SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner);
- sections = bulkLoaded.getPositionsForRanges(ranges);
- assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
- bulkLoaded.selfRef().release();
- }
-
- @Test
- public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
- CompactionManager.instance.disableAutoCompaction();
-
- final int NUM_ROWS = 512;
- for (int j = 0; j < NUM_ROWS; j++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
- Mutation rm = new Mutation(KEYSPACE1, key);
- rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
-
- Collection<SSTableReader> sstables = store.getSSTables();
- assert sstables.size() == 1;
- final SSTableReader sstable = sstables.iterator().next();
-
- ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
- List<Future> futures = new ArrayList<>(NUM_ROWS * 2);
- for (int i = 0; i < NUM_ROWS; i++)
- {
- final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i));
- final int index = i;
-
- futures.add(executor.submit(new Runnable()
- {
- public void run()
- {
- ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), Composites.EMPTY, Composites.EMPTY, false, 100, 100);
- assertFalse(result.isEmpty());
- assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(Util.cellname("0")).value()));
- }
- }));
-
- futures.add(executor.submit(new Runnable()
- {
- public void run()
- {
- Iterable<DecoratedKey> results = store.keySamples(
- new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key)));
- assertTrue(results.iterator().hasNext());
- }
- }));
- }
-
- SSTableReader replacement;
- try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
- {
- replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
- txn.update(replacement, true);
- txn.finish();
- }
- for (Future future : futures)
- future.get();
-
- assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
- }
-
- private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS)
- {
- assert "Indexed1".equals(indexedCFS.name);
-
- // make sure all sstables including 2ary indexes load from disk
- for (ColumnFamilyStore cfs : indexedCFS.concatWithIndexes())
- clearAndLoad(cfs);
-
- // query using index to see if sstable for secondary index opens
- IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), Operator.EQ, ByteBufferUtil.bytes(1L));
- List<IndexExpression> clause = Arrays.asList(expr);
- Range<RowPosition> range = Util.range("", "");
- List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100);
- assert rows.size() == 1;
- }
-
- private List<Range<Token>> makeRanges(Token left, Token right)
- {
- return Arrays.asList(new Range<>(left, right));
- }
-
- private DecoratedKey k(int i)
- {
- return new BufferDecoratedKey(t(i), ByteBufferUtil.bytes(String.valueOf(i)));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5c83f49/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java
new file mode 100644
index 0000000..6d07f1c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java
@@ -0,0 +1,648 @@
+package org.apache.cassandra.io.sstable.format;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.google.common.collect.Sets;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class SSTableReaderTest
+{
+ public static final String KEYSPACE1 = "SSTableReaderTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_STANDARD2 = "Standard2";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ static Token t(int i)
+ {
+ return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
+ }
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
+ SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingOptions.NONE));
+ }
+
+ @Test
+ public void testGetPositionsForRanges()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ // 1 key
+ ranges.add(new Range<Token>(t(0), t(1)));
+ // 2 keys
+ ranges.add(new Range<Token>(t(2), t(4)));
+ // wrapping range from key to end
+ ranges.add(new Range<Token>(t(6), StorageService.getPartitioner().getMinimumToken()));
+ // empty range (should be ignored)
+ ranges.add(new Range<Token>(t(9), t(91)));
+
+ // confirm that positions increase continuously
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ long previous = -1;
+ for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges))
+ {
+ assert previous <= section.left : previous + " ! < " + section.left;
+ assert section.left < section.right : section.left + " ! < " + section.right;
+ previous = section.right;
+ }
+ }
+
+ @Test
+ public void testSpannedIndexPositions() throws IOException
+ {
+ long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE;
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+
+ try
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+
+ // insert a bunch of data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 100; j += 2)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ // check that all our keys are found correctly
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ for (int j = 0; j < 100; j += 2)
+ {
+ DecoratedKey dk = Util.dk(String.valueOf(j));
+ FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position);
+ DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+ assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath());
+ }
+
+ // check no false positives
+ for (int j = 1; j < 110; j += 2)
+ {
+ DecoratedKey dk = Util.dk(String.valueOf(j));
+ assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null;
+ }
+ }
+ finally
+ {
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize;
+ }
+ }
+
+ @Test
+ public void testPersistentStatistics()
+ {
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+
+ for (int j = 0; j < 100; j += 2)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+
+ clearAndLoad(store);
+ assert store.metric.maxRowSize.getValue() != 0;
+ }
+
+ private void clearAndLoad(ColumnFamilyStore cfs)
+ {
+ cfs.clearUnsafe();
+ cfs.loadNewSSTables();
+ }
+
+ @Test
+ public void testReadRateTracking()
+ {
+ // try to make sure CASSANDRA-8239 never happens again
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+
+ for (int j = 0; j < 10; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ assertEquals(0, sstable.getReadMeter().count());
+
+ DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4"));
+ store.getColumnFamily(key, Composites.EMPTY, Composites.EMPTY, false, 100, 100);
+ assertEquals(1, sstable.getReadMeter().count());
+ store.getColumnFamily(key, cellname("0"), cellname("0"), false, 100, 100);
+ assertEquals(2, sstable.getReadMeter().count());
+ store.getColumnFamily(Util.namesQueryFilter(store, key, cellname("0")));
+ assertEquals(3, sstable.getReadMeter().count());
+ }
+
+ @Test
+ public void testGetPositionsForRangesWithKeyCache()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ CacheService.instance.keyCache.setCapacity(100);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position;
+ long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position;
+ long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position;
+ long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position;
+
+ Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);
+
+ // range are start exclusive so we should start at 3
+ assert p.left == p3;
+
+ // to capture 6 we have to stop at the start of 7
+ assert p.right == p7;
+ }
+
+ @Test
+ public void testPersistentStatisticsWithSecondaryIndex()
+ {
+ // Create secondary index and flush to disk
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis());
+ rm.applyUnsafe();
+ store.forceBlockingFlush();
+
+ // check if opening and querying works
+ assertIndexQueryWorks(store);
+ }
+ public void testGetPositionsKeyCacheStats()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ CacheService.instance.keyCache.setCapacity(1000);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation("Keyspace1", key);
+ rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ sstable.getPosition(k(2), SSTableReader.Operator.EQ);
+ assertEquals(0, sstable.getKeyCacheHit());
+ assertEquals(1, sstable.getBloomFilterTruePositiveCount());
+ sstable.getPosition(k(2), SSTableReader.Operator.EQ);
+ assertEquals(1, sstable.getKeyCacheHit());
+ assertEquals(2, sstable.getBloomFilterTruePositiveCount());
+ sstable.getPosition(k(15), SSTableReader.Operator.EQ);
+ assertEquals(1, sstable.getKeyCacheHit());
+ assertEquals(2, sstable.getBloomFilterTruePositiveCount());
+
+ }
+
+
+ @Test
+ public void testOpeningSSTable() throws Exception
+ {
+ String ks = KEYSPACE1;
+ String cf = "Standard1";
+
+ // clear and create just one sstable for this test
+ Keyspace keyspace = Keyspace.open(ks);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+ store.clearUnsafe();
+ store.disableAutoCompaction();
+
+ DecoratedKey firstKey = null, lastKey = null;
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < store.metadata.getMinIndexInterval(); i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ if (firstKey == null)
+ firstKey = key;
+ if (lastKey == null)
+ lastKey = key;
+ if (store.metadata.getKeyValidator().compare(lastKey.getKey(), key.getKey()) < 0)
+ lastKey = key;
+ Mutation rm = new Mutation(ks, key.getKey());
+ rm.add(cf, cellname("col"),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ Descriptor desc = sstable.descriptor;
+
+ // test to see if sstable can be opened as expected
+ SSTableReader target = SSTableReader.open(desc);
+ Assert.assertEquals(target.getIndexSummarySize(), 1);
+ Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0));
+ assert target.first.equals(firstKey);
+ assert target.last.equals(lastKey);
+ target.selfRef().release();
+ }
+
+ @Test
+ public void testLoadingSummaryUsesCorrectPartitioner() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(1L), System.currentTimeMillis());
+ rm.applyUnsafe();
+ store.forceBlockingFlush();
+
+ ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(ByteBufferUtil.bytes("birthdate")).getIndexCfs();
+ assert indexCfs.partitioner instanceof LocalPartitioner;
+ SSTableReader sstable = indexCfs.getSSTables().iterator().next();
+ assert sstable.first.getToken() instanceof LocalToken;
+
+ try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+ SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), sstable.compression))
+ {
+ sstable.saveSummary(ibuilder, dbuilder);
+ }
+ SSTableReader reopened = SSTableReader.open(sstable.descriptor);
+ assert reopened.first.getToken() instanceof LocalToken;
+ reopened.selfRef().release();
+ }
+
+ /** see CASSANDRA-5407 */
+ @Test
+ public void testGetScannerForNoIntersectingRanges() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf("k1"));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard1", cellname("xyz"), ByteBufferUtil.bytes("abc"), 0);
+ rm.applyUnsafe();
+ store.forceBlockingFlush();
+ boolean foundScanner = false;
+ for (SSTableReader s : store.getSSTables())
+ {
+ try (ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null))
+ {
+ scanner.next(); // throws exception pre 5407
+ foundScanner = true;
+ }
+ }
+ assertTrue(foundScanner);
+ }
+
+ @Test
+ public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+
+ // insert data and compact to a single sstable. The
+ // number of keys inserted is greater than index_interval
+ // to ensure multiple segments in the index file
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 130; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("Standard2", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ // construct a range which is present in the sstable, but whose
+ // keys are not found in the first segment of the index.
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ ranges.add(new Range<Token>(t(98), t(99)));
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable" ;
+
+ // re-open the same sstable as it would be during bulk loading
+ Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
+ if (sstable.compression)
+ components.add(Component.COMPRESSION_INFO);
+ SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner);
+ sections = bulkLoaded.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
+ bulkLoaded.selfRef().release();
+ }
+
+ @Test
+ public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+ CompactionManager.instance.disableAutoCompaction();
+
+ final int NUM_ROWS = 512;
+ for (int j = 0; j < NUM_ROWS; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ Collection<SSTableReader> sstables = store.getSSTables();
+ assert sstables.size() == 1;
+ final SSTableReader sstable = sstables.iterator().next();
+
+ ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
+ List<Future> futures = new ArrayList<>(NUM_ROWS * 2);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i));
+ final int index = i;
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), Composites.EMPTY, Composites.EMPTY, false, 100, 100);
+ assertFalse(result.isEmpty());
+ assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(Util.cellname("0")).value()));
+ }
+ }));
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ Iterable<DecoratedKey> results = store.keySamples(
+ new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key)));
+ assertTrue(results.iterator().hasNext());
+ }
+ }));
+ }
+
+ SSTableReader replacement;
+ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+ {
+ replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
+ txn.update(replacement, true);
+ txn.finish();
+ }
+ for (Future future : futures)
+ future.get();
+
+ assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
+ }
+
+ @Test
+ public void testIndexSummaryUpsampleAndReload() throws Exception
+ {
+ long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE;
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+
+ try
+ {
+ testIndexSummaryUpsampleAndReload0();
+ }
+ finally
+ {
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize;
+ }
+ }
+
+ private void testIndexSummaryUpsampleAndReload0() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+ CompactionManager.instance.disableAutoCompaction();
+
+ final int NUM_ROWS = 512;
+ for (int j = 0; j < NUM_ROWS; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ Collection<SSTableReader> sstables = store.getSSTables();
+ assert sstables.size() == 1;
+ final SSTableReader sstable = sstables.iterator().next();
+
+ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+ {
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, sstable.getIndexSummarySamplingLevel() + 1);
+ txn.update(replacement, true);
+ txn.finish();
+ }
+ SSTableReader reopen = SSTableReader.open(sstable.descriptor);
+ assert reopen.getIndexSummarySamplingLevel() == sstable.getIndexSummarySamplingLevel() + 1;
+ }
+
+ @Test
+ public void testIndexSummaryDownsampleAndReload() throws Exception
+ {
+ long originalMaxSegmentSize = MmappedSegmentedFile.MAX_SEGMENT_SIZE;
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+
+ try
+ {
+ testIndexSummaryDownsampleAndReload0();
+ }
+ finally
+ {
+ MmappedSegmentedFile.MAX_SEGMENT_SIZE = originalMaxSegmentSize;
+ }
+ }
+
+ private void testIndexSummaryDownsampleAndReload0() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+ CompactionManager.instance.disableAutoCompaction();
+
+ final int NUM_ROWS = 512;
+ for (int j = 0; j < NUM_ROWS; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+ Mutation rm = new Mutation(KEYSPACE1, key);
+ rm.add("StandardLowIndexInterval", Util.cellname("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
+ rm.applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ Collection<SSTableReader> sstables = store.getSSTables();
+ assert sstables.size() == 1;
+ final SSTableReader sstable = sstables.iterator().next();
+
+ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+ {
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, sstable.getIndexSummarySamplingLevel() / 2);
+ txn.update(replacement, true);
+ txn.finish();
+ }
+ SSTableReader reopen = SSTableReader.open(sstable.descriptor);
+ assert Arrays.equals(sstable.ifile.copyReadableBounds(), reopen.ifile.copyReadableBounds());
+ assert Arrays.equals(sstable.dfile.copyReadableBounds(), reopen.dfile.copyReadableBounds());
+ }
+
+
+ private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS)
+ {
+ assert "Indexed1".equals(indexedCFS.name);
+
+ // make sure all sstables including 2ary indexes load from disk
+ for (ColumnFamilyStore cfs : indexedCFS.concatWithIndexes())
+ clearAndLoad(cfs);
+
+ // query using index to see if sstable for secondary index opens
+ IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), Operator.EQ, ByteBufferUtil.bytes(1L));
+ List<IndexExpression> clause = Arrays.asList(expr);
+ Range<RowPosition> range = Util.range("", "");
+ List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100);
+ assert rows.size() == 1;
+ }
+
+ private List<Range<Token>> makeRanges(Token left, Token right)
+ {
+ return Arrays.asList(new Range<>(left, right));
+ }
+
+ private DecoratedKey k(int i)
+ {
+ return new BufferDecoratedKey(t(i), ByteBufferUtil.bytes(String.valueOf(i)));
+ }
+}
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by ty...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db49d3b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db49d3b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db49d3b8
Branch: refs/heads/trunk
Commit: db49d3b89db3ab8a37b643d7e20938ffce1fbc9c
Parents: bc8e878 d4e6f08
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Thu Feb 11 17:54:30 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Thu Feb 11 17:54:30 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/sstable/format/SSTableReader.java | 13 ++-
.../cassandra/io/sstable/SSTableReaderTest.java | 113 ++++++++++++++-----
3 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db49d3b8/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db49d3b8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db49d3b8/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ty...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4e6f08d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4e6f08d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4e6f08d
Branch: refs/heads/trunk
Commit: d4e6f08d48317796bfda8691f85c038fcd264769
Parents: 604c9df d5c83f4
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Thu Feb 11 17:54:22 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Thu Feb 11 17:54:22 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/sstable/format/SSTableReader.java | 13 ++-
.../cassandra/io/sstable/SSTableReaderTest.java | 113 ++++++++++++++-----
3 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4e6f08d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7a0357f,fa25980..5156b0c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
-2.2.6
+3.0.4
+ * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
+ * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
+ * Fix wrong rack counting and invalid conditions check for TokenAllocation
+ (CASSANDRA-11139)
+ * Avoid creating empty hint files (CASSANDRA-11090)
+ * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120)
+ * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062)
+ * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001)
+ * Add dropped_columns to the list of schema table so it gets handled
+ properly (CASSANDRA-11050)
+Merged from 2.2:
+ * Always persist upsampled index summaries (CASSANDRA-10512)
* (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
* Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
* Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4e6f08d/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 1618516,e81e4e9..691bf45
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1175,13 -1154,7 +1175,7 @@@ public abstract class SSTableReader ext
else if (samplingLevel < indexSummary.getSamplingLevel())
{
// we can use the existing index summary to make a smaller one
- newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
-
- try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
- {
- saveSummary(ibuilder, dbuilder, newSummary);
- }
}
else
{
@@@ -1189,6 -1162,18 +1183,13 @@@
"no adjustments to min/max_index_interval");
}
+ //Always save the resampled index
+ try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+ SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
+ {
- for (long boundry : dfile.copyReadableBounds())
- dbuilder.addPotentialBoundary(boundry);
- for (long boundry : ifile.copyReadableBounds())
- ibuilder.addPotentialBoundary(boundry);
-
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+
long newSize = bytesOnDisk();
StorageMetrics.load.inc(newSize - oldSize);
parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4e6f08d/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index c7f3c36,0000000..640b68b
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@@ -1,567 -1,0 +1,624 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.MmappedRegions;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class SSTableReaderTest
+{
+ public static final String KEYSPACE1 = "SSTableReaderTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_STANDARD2 = "Standard2";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ private IPartitioner partitioner;
+
+ Token t(int i)
+ {
+ return partitioner.getToken(ByteBufferUtil.bytes(String.valueOf(i)));
+ }
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingParams.CACHE_NOTHING));
+ }
+
+ @Test
+ public void testGetPositionsForRanges()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ // 1 key
+ ranges.add(new Range<>(t(0), t(1)));
+ // 2 keys
+ ranges.add(new Range<>(t(2), t(4)));
+ // wrapping range from key to end
+ ranges.add(new Range<>(t(6), partitioner.getMinimumToken()));
+ // empty range (should be ignored)
+ ranges.add(new Range<>(t(9), t(91)));
+
+ // confirm that positions increase continuously
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ long previous = -1;
+ for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges))
+ {
+ assert previous <= section.left : previous + " ! < " + section.left;
+ assert section.left < section.right : section.left + " ! < " + section.right;
+ previous = section.right;
+ }
+ }
+
+ @Test
+ public void testSpannedIndexPositions() throws IOException
+ {
++ int originalMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
+ MmappedRegions.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
- partitioner = store.getPartitioner();
-
- // insert a bunch of data and compact to a single sstable
- CompactionManager.instance.disableAutoCompaction();
- for (int j = 0; j < 100; j += 2)
++ try
+ {
- new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
- .clustering("0")
- .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
- .build()
- .applyUnsafe();
- }
- store.forceBlockingFlush();
- CompactionManager.instance.performMaximal(store, false);
++ Keyspace keyspace = Keyspace.open(KEYSPACE1);
++ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
++ partitioner = store.getPartitioner();
+
- // check that all our keys are found correctly
- SSTableReader sstable = store.getLiveSSTables().iterator().next();
- for (int j = 0; j < 100; j += 2)
- {
- DecoratedKey dk = Util.dk(String.valueOf(j));
- FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position);
- DecoratedKey keyInDisk = sstable.decorateKey(ByteBufferUtil.readWithShortLength(file));
- assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath());
- }
++ // insert a bunch of data and compact to a single sstable
++ CompactionManager.instance.disableAutoCompaction();
++ for (int j = 0; j < 100; j += 2)
++ {
++ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
++ .clustering("0")
++ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
++ .build()
++ .applyUnsafe();
++ }
++ store.forceBlockingFlush();
++ CompactionManager.instance.performMaximal(store, false);
++
++ // check that all our keys are found correctly
++ SSTableReader sstable = store.getLiveSSTables().iterator().next();
++ for (int j = 0; j < 100; j += 2)
++ {
++ DecoratedKey dk = Util.dk(String.valueOf(j));
++ FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position);
++ DecoratedKey keyInDisk = sstable.decorateKey(ByteBufferUtil.readWithShortLength(file));
++ assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath());
++ }
+
- // check no false positives
- for (int j = 1; j < 110; j += 2)
++ // check no false positives
++ for (int j = 1; j < 110; j += 2)
++ {
++ DecoratedKey dk = Util.dk(String.valueOf(j));
++ assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null;
++ }
++ }
++ finally
+ {
- DecoratedKey dk = Util.dk(String.valueOf(j));
- assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == null;
++ MmappedRegions.MAX_SEGMENT_SIZE = originalMaxSegmentSize;
+ }
+ }
+
+ @Test
+ public void testPersistentStatistics()
+ {
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
+
+ for (int j = 0; j < 100; j += 2)
+ {
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+
+ clearAndLoad(store);
+ assert store.metric.maxPartitionSize.getValue() != 0;
+ }
+
+ private void clearAndLoad(ColumnFamilyStore cfs)
+ {
+ cfs.clearUnsafe();
+ cfs.loadNewSSTables();
+ }
+
+ @Test
+ public void testReadRateTracking()
+ {
+ // try to make sure CASSANDRA-8239 never happens again
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
+
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+
+ store.forceBlockingFlush();
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ assertEquals(0, sstable.getReadMeter().count());
+
+ DecoratedKey key = sstable.decorateKey(ByteBufferUtil.bytes("4"));
+ Util.getAll(Util.cmd(store, key).build());
+ assertEquals(1, sstable.getReadMeter().count());
+
+ Util.getAll(Util.cmd(store, key).includeRow("0").build());
+ assertEquals(2, sstable.getReadMeter().count());
+ }
+
+ @Test
+ public void testGetPositionsForRangesWithKeyCache()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
+ CacheService.instance.keyCache.setCapacity(100);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ long p2 = sstable.getPosition(k(2), SSTableReader.Operator.EQ).position;
+ long p3 = sstable.getPosition(k(3), SSTableReader.Operator.EQ).position;
+ long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position;
+ long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position;
+
+ Pair<Long, Long> p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);
+
+ // range are start exclusive so we should start at 3
+ assert p.left == p3;
+
+ // to capture 6 we have to stop at the start of 7
+ assert p.right == p7;
+ }
+
+ @Test
+ public void testPersistentStatisticsWithSecondaryIndex()
+ {
+ // Create secondary index and flush to disk
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_INDEXED);
+ partitioner = store.getPartitioner();
+
+ new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), "k1")
+ .clustering("0")
+ .add("birthdate", 1L)
+ .build()
+ .applyUnsafe();
+
+ store.forceBlockingFlush();
+
+ // check if opening and querying works
+ assertIndexQueryWorks(store);
+ }
+ public void testGetPositionsKeyCacheStats()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
+ CacheService.instance.keyCache.setCapacity(1000);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ sstable.getPosition(k(2), SSTableReader.Operator.EQ);
+ assertEquals(0, sstable.getKeyCacheHit());
+ assertEquals(1, sstable.getBloomFilterTruePositiveCount());
+ sstable.getPosition(k(2), SSTableReader.Operator.EQ);
+ assertEquals(1, sstable.getKeyCacheHit());
+ assertEquals(2, sstable.getBloomFilterTruePositiveCount());
+ sstable.getPosition(k(15), SSTableReader.Operator.EQ);
+ assertEquals(1, sstable.getKeyCacheHit());
+ assertEquals(2, sstable.getBloomFilterTruePositiveCount());
+
+ }
+
+
+ @Test
+ public void testOpeningSSTable() throws Exception
+ {
+ String ks = KEYSPACE1;
+ String cf = "Standard1";
+
+ // clear and create just one sstable for this test
+ Keyspace keyspace = Keyspace.open(ks);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+ store.clearUnsafe();
+ store.disableAutoCompaction();
+
+ DecoratedKey firstKey = null, lastKey = null;
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < store.metadata.params.minIndexInterval; i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ if (firstKey == null)
+ firstKey = key;
+ if (lastKey == null)
+ lastKey = key;
+ if (store.metadata.getKeyValidator().compare(lastKey.getKey(), key.getKey()) < 0)
+ lastKey = key;
+
+
+ new RowUpdateBuilder(store.metadata, timestamp, key.getKey())
+ .clustering("col")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+
+ }
+ store.forceBlockingFlush();
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ Descriptor desc = sstable.descriptor;
+
+ // test to see if sstable can be opened as expected
+ SSTableReader target = SSTableReader.open(desc);
+ Assert.assertEquals(target.getIndexSummarySize(), 1);
+ Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.getKey()), target.getIndexSummaryKey(0));
+ assert target.first.equals(firstKey);
+ assert target.last.equals(lastKey);
+ target.selfRef().release();
+ }
+
+ @Test
+ public void testLoadingSummaryUsesCorrectPartitioner() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Indexed1");
+
+ new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), "k1")
+ .clustering("0")
+ .add("birthdate", 1L)
+ .build()
+ .applyUnsafe();
+
+ store.forceBlockingFlush();
+
+ for(ColumnFamilyStore indexCfs : store.indexManager.getAllIndexColumnFamilyStores())
+ {
+ assert indexCfs.isIndex();
+ SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
+ assert sstable.first.getToken() instanceof LocalToken;
+
+ try (SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(),
+ false);
+ SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(),
+ sstable.compression))
+ {
+ sstable.saveSummary(ibuilder, dbuilder);
+ }
+ SSTableReader reopened = SSTableReader.open(sstable.descriptor);
+ assert reopened.first.getToken() instanceof LocalToken;
+ reopened.selfRef().release();
+ }
+ }
+
+ /** see CASSANDRA-5407 */
+ @Test
+ public void testGetScannerForNoIntersectingRanges() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
+
+ new RowUpdateBuilder(store.metadata, 0, "k1")
+ .clustering("xyz")
+ .add("val", "abc")
+ .build()
+ .applyUnsafe();
+
+ store.forceBlockingFlush();
+ boolean foundScanner = false;
+ for (SSTableReader s : store.getLiveSSTables())
+ {
+ try (ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null))
+ {
+ scanner.next(); // throws exception pre 5407
+ foundScanner = true;
+ }
+ }
+ assertTrue(foundScanner);
+ }
+
+ @Test
+ public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
+
+ // insert data and compact to a single sstable. The
+ // number of keys inserted is greater than index_interval
+ // to ensure multiple segments in the index file
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 130; j++)
+ {
+
+ new RowUpdateBuilder(store.metadata, j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ // construct a range which is present in the sstable, but whose
+ // keys are not found in the first segment of the index.
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ ranges.add(new Range<Token>(t(98), t(99)));
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable" ;
+
+ // re-open the same sstable as it would be during bulk loading
+ Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
+ if (sstable.components.contains(Component.COMPRESSION_INFO))
+ components.add(Component.COMPRESSION_INFO);
+ SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata);
+ sections = bulkLoaded.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
+ bulkLoaded.selfRef().release();
+ }
+
+ @Test
+ public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+ CompactionManager.instance.disableAutoCompaction();
+
+ final int NUM_PARTITIONS = 512;
+ for (int j = 0; j < NUM_PARTITIONS; j++)
+ {
+ new RowUpdateBuilder(store.metadata, j, String.format("%3d", j))
+ .clustering("0")
+ .add("val", String.format("%3d", j))
+ .build()
+ .applyUnsafe();
+
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ Collection<SSTableReader> sstables = store.getLiveSSTables();
+ assert sstables.size() == 1;
+ final SSTableReader sstable = sstables.iterator().next();
+
+ ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
+ List<Future> futures = new ArrayList<>(NUM_PARTITIONS * 2);
+ for (int i = 0; i < NUM_PARTITIONS; i++)
+ {
+ final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i));
+ final int index = i;
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ Row row = Util.getOnlyRowUnfiltered(Util.cmd(store, key).build());
+ assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), row.cells().iterator().next().value()));
+ }
+ }));
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ Iterable<DecoratedKey> results = store.keySamples(
+ new Range<>(sstable.getPartitioner().getMinimumToken(), sstable.getPartitioner().getToken(key)));
+ assertTrue(results.iterator().hasNext());
+ }
+ }));
+ }
+
+ SSTableReader replacement;
+ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+ {
+ replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
+ txn.update(replacement, true);
+ txn.finish();
+ }
+ for (Future future : futures)
+ future.get();
+
+ assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
+ }
+
++ @Test
++ public void testIndexSummaryUpsampleAndReload() throws Exception
++ {
++ int originalMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
++ MmappedRegions.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
++
++ try
++ {
++ testIndexSummaryUpsampleAndReload0();
++ }
++ finally
++ {
++ MmappedRegions.MAX_SEGMENT_SIZE = originalMaxSegmentSize;
++ }
++ }
++
++ private void testIndexSummaryUpsampleAndReload0() throws Exception
++ {
++ Keyspace keyspace = Keyspace.open(KEYSPACE1);
++ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
++ CompactionManager.instance.disableAutoCompaction();
++
++ final int NUM_PARTITIONS = 512;
++ for (int j = 0; j < NUM_PARTITIONS; j++)
++ {
++ new RowUpdateBuilder(store.metadata, j, String.format("%3d", j))
++ .clustering("0")
++ .add("val", String.format("%3d", j))
++ .build()
++ .applyUnsafe();
++
++ }
++ store.forceBlockingFlush();
++ CompactionManager.instance.performMaximal(store, false);
++
++ Collection<SSTableReader> sstables = store.getLiveSSTables();
++ assert sstables.size() == 1;
++ final SSTableReader sstable = sstables.iterator().next();
++
++ try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
++ {
++ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, sstable.getIndexSummarySamplingLevel() + 1);
++ txn.update(replacement, true);
++ txn.finish();
++ }
++ SSTableReader reopen = SSTableReader.open(sstable.descriptor);
++ assert reopen.getIndexSummarySamplingLevel() == sstable.getIndexSummarySamplingLevel() + 1;
++ }
++
+ private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS)
+ {
+ assert "Indexed1".equals(indexedCFS.name);
+
+ // make sure all sstables including 2ary indexes load from disk
+ for (ColumnFamilyStore cfs : indexedCFS.concatWithIndexes())
+ clearAndLoad(cfs);
+
+
+ // query using index to see if sstable for secondary index opens
+ ReadCommand rc = Util.cmd(indexedCFS).fromKeyIncl("k1").toKeyIncl("k3")
+ .columns("birthdate")
+ .filterOn("birthdate", Operator.EQ, 1L)
+ .build();
+ Index.Searcher searcher = indexedCFS.indexManager.getBestIndexFor(rc).searcherFor(rc);
+ assertNotNull(searcher);
+ try (ReadOrderGroup orderGroup = ReadOrderGroup.forCommand(rc))
+ {
+ assertEquals(1, Util.size(UnfilteredPartitionIterators.filter(searcher.search(orderGroup), rc.nowInSec())));
+ }
+ }
+
+ private List<Range<Token>> makeRanges(Token left, Token right)
+ {
+ return Arrays.asList(new Range<>(left, right));
+ }
+
+ private DecoratedKey k(int i)
+ {
+ return new BufferDecoratedKey(t(i), ByteBufferUtil.bytes(String.valueOf(i)));
+ }
+}