You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/11 23:18:20 UTC
[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de23f20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de23f20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de23f20
Branch: refs/heads/trunk
Commit: 0de23f20ae4bd95f040017e2db653c6c1b5eabe9
Parents: 9a90e98 e487553
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Nov 11 16:16:23 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 11 16:16:23 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 34 +++++++++++
.../db/compaction/CompactionController.java | 5 --
src/java/org/apache/cassandra/dht/Bounds.java | 62 ++++++++++++++++++++
.../cassandra/streaming/StreamReader.java | 12 ++--
.../cassandra/streaming/StreamReceiveTask.java | 37 +++++++++++-
.../compress/CompressedStreamReader.java | 2 +-
.../apache/cassandra/db/CounterCacheTest.java | 48 +++++++++++++++
.../org/apache/cassandra/db/RowCacheTest.java | 50 ++++++++++++++++
.../org/apache/cassandra/dht/BoundsTest.java | 61 +++++++++++++++++++
10 files changed, 298 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d271c95,0fcf037..02dc249
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,6 +1,51 @@@
-2.2.4
+3.0.1
+ * Keep the file open in trySkipCache (CASSANDRA-10669)
+ * Updated trigger example (CASSANDRA-10257)
+Merged from 2.2:
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+Merged from 2.1:
++ * Invalidate cache after stream receive task is completed (CASSANDRA-10341)
+ * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
+ * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
+
+
+3.0
+ * Fix AssertionError while flushing memtable due to materialized views
+ incorrectly inserting empty rows (CASSANDRA-10614)
+ * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650)
+ * Don't use -1 for the position of partition key in schema (CASSANDRA-10491)
+ * Fix distinct queries in mixed version cluster (CASSANDRA-10573)
+ * Skip sstable on clustering in names query (CASSANDRA-10571)
+ * Remove value skipping as it breaks read-repair (CASSANDRA-10655)
+ * Fix bootstrapping with MVs (CASSANDRA-10621)
+ * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584)
+ * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634)
+ * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600)
+ * Fix reading of legacy sstables (CASSANDRA-10590)
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
+ * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
+ * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
+ * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
+ * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
+ * Fix backward compatibility for counters (CASSANDRA-10470)
+ * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628)
+ * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
+ * Fix thrift cas operations with defined columns (CASSANDRA-10576)
+ * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
+ * Fix thrift get() queries with defined columns (CASSANDRA-10586)
+ * Fix marking of indexes as built and removed (CASSANDRA-10601)
+ * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
+ * Fix batches on multiple tables (CASSANDRA-10554)
+ * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
+ * Remove token generator (CASSANDRA-5261)
+ * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
+ * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
+ * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
+ * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
+Merged from 2.2:
* (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
* Use most up-to-date version of schema for system tables (CASSANDRA-10652)
* Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0b838bf,2d58219..38c99ea
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1739,6 -2519,41 +1739,40 @@@ public class ColumnFamilyStore implemen
CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
}
+ public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate)
+ {
+ int invalidatedKeys = 0;
+ for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
+ keyIter.hasNext(); )
+ {
+ RowCacheKey key = keyIter.next();
- DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
++ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+ {
- invalidateCachedRow(dk);
++ invalidateCachedPartition(dk);
+ invalidatedKeys++;
+ }
+ }
-
+ return invalidatedKeys;
+ }
+
+ public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate)
+ {
+ int invalidatedKeys = 0;
+ for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
+ keyIter.hasNext(); )
+ {
+ CounterCacheKey key = keyIter.next();
- DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
++ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey));
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+ {
+ CacheService.instance.counterCache.remove(key);
+ invalidatedKeys++;
+ }
+ }
+ return invalidatedKeys;
+ }
+
/**
* @return true if @param key is contained in the row cache
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/Bounds.java
index d9c189d,73414cd..a125168
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@@ -17,10 -17,19 +17,19 @@@
*/
package org.apache.cassandra.dht;
+ import java.util.ArrayList;
+ import java.util.Collection;
import java.util.Collections;
+ import java.util.Comparator;
import java.util.List;
+ import java.util.Set;
+
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.PeekingIterator;
+ import com.google.common.collect.Sets;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.utils.Pair;
/**
@@@ -102,16 -111,20 +111,30 @@@ public class Bounds<T extends RingPosit
return "]";
}
+ public static <T extends RingPosition<T>> boolean isInBounds(T token, Iterable<Bounds<T>> bounds)
+ {
+ assert bounds != null;
+
+ for (Bounds<T> bound : bounds)
+ {
+ if (bound.contains(token))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean isStartInclusive()
+ {
+ return true;
+ }
+
+ public boolean isEndInclusive()
+ {
+ return true;
+ }
+
/**
* Compute a bounds of keys corresponding to a given bounds of token.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 6169494,fe3b13d..4a38d5b
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -106,7 -102,8 +106,7 @@@ public class StreamReade
writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
- writePartition(deserializer, writer, cfs);
- writeRow(writer, in, cfs);
-
++ writePartition(deserializer, writer);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
@@@ -167,122 -167,9 +167,120 @@@
return size;
}
- protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException
- protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
++ protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException
{
- DecoratedKey key = deserializer.newPartition();
- writer.append(deserializer);
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
- writer.appendFromStream(key, cfs.metadata, in, inputVersion);
++ writer.append(deserializer.newPartition());
+ deserializer.checkForExceptions();
- cfs.invalidateCachedPartition(key);
+ }
+
+ public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
+ {
+ private final CFMetaData metadata;
+ private final DataInputPlus in;
+ private final SerializationHeader header;
+ private final SerializationHelper helper;
+
+ private DecoratedKey key;
+ private DeletionTime partitionLevelDeletion;
+ private SSTableSimpleIterator iterator;
+ private Row staticRow;
+ private IOException exception;
+
+ public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header)
+ {
+ assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
+ this.metadata = metadata;
+ this.in = in;
+ this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
+ this.header = header;
+ }
+
- public DecoratedKey newPartition() throws IOException
++ public StreamDeserializer newPartition() throws IOException
+ {
+ key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in));
+ partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
+ iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
+ staticRow = iterator.readStaticRow();
- return key;
++ return this;
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public PartitionColumns columns()
+ {
+ // We don't know which columns we'll get so assume it can be all of them
+ return metadata.partitionColumns();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return false;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return partitionLevelDeletion;
+ }
+
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+ public EncodingStats stats()
+ {
+ return header.stats();
+ }
+
+ public boolean hasNext()
+ {
+ try
+ {
+ return iterator.hasNext();
+ }
+ catch (IOError e)
+ {
+ if (e.getCause() != null && e.getCause() instanceof IOException)
+ {
+ exception = (IOException)e.getCause();
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ public Unfiltered next()
+ {
+ // Note that in practice we know that IOException will be thrown by hasNext(), because that's
+ // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily
+ // to what we do in hasNext)
+ Unfiltered unfiltered = iterator.next();
+ return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
+ ? maybeMarkLocalToBeCleared((Row) unfiltered)
+ : unfiltered;
+ }
+
+ private Row maybeMarkLocalToBeCleared(Row row)
+ {
+ return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
+ }
+
+ public void checkForExceptions() throws IOException
+ {
+ if (exception != null)
+ throw exception;
+ }
+
+ public void close()
+ {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0b864fa,846524b..54ce711
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -29,17 -35,12 +36,19 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
+ import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@@ -128,66 -122,48 +137,92 @@@ public class StreamReceiveTask extends
return;
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
- StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
- lockfile.create(task.sstables);
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
- readers.add(writer.finish(true));
- lockfile.delete();
- task.sstables.clear();
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
+ try
{
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ List<SSTableReader> readers = new ArrayList<>();
+ for (SSTableMultiWriter writer : task.sstables)
{
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- for (SSTableReader sstable : readers)
- boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ Collection<SSTableReader> newReaders = writer.finish(true);
+ readers.addAll(newReaders);
+ task.txn.update(newReaders, false);
+ }
+
+ task.sstables.clear();
- if (cfs.isRowCacheEnabled())
+ try (Refs<SSTableReader> refs = Refs.ref(readers))
+ {
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
+ //all partitions through the same write path as normal mutations.
+ //This also ensures any 2is are also updated
+ if (hasViews)
+ {
+ for (SSTableReader reader : readers)
+ {
+ try (ISSTableScanner scanner = reader.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIterator = scanner.next())
+ {
+ //Apply unsafe (we will flush below before transaction is done)
+ new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ }
+ }
+ }
+ }
+ }
+ else
{
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ task.txn.finish();
+
+ // add sstables and build secondary indexes
+ cfs.addSSTables(readers);
+ cfs.indexManager.buildAllIndexesBlocking(readers);
++
++ //invalidate row and counter cache
++ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
++ {
++ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
++ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
++ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
++
++ if (cfs.isRowCacheEnabled())
++ {
++ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
++ if (invalidatedKeys > 0)
++ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
++ "receive task completed.", task.session.planId(), invalidatedKeys,
++ cfs.keyspace.getName(), cfs.getTableName());
++ }
++
++ if (cfs.metadata.isCounter())
++ {
++ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
++ if (invalidatedKeys > 0)
++ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
++ "receive task completed.", task.session.planId(), invalidatedKeys,
++ cfs.keyspace.getName(), cfs.getTableName());
++ }
++ }
}
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error applying streamed sstable: ", t);
- if (cfs.metadata.isCounter())
+ JVMStabilityInspector.inspectThrowable(t);
+ }
+ finally
+ {
+ //We don't keep the streamed sstables since we've applied them manually
+ //So we abort the txn and delete the streamed sstables
+ if (hasViews)
{
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ cfs.forceBlockingFlush();
+ task.txn.abort();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fca6aa7,facb906..8f53832
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -93,7 -92,8 +93,7 @@@ public class CompressedStreamReader ext
while (in.getBytesRead() < sectionLength)
{
- writePartition(deserializer, writer, cfs);
- writeRow(writer, in, cfs);
-
++ writePartition(deserializer, writer);
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 65ec420,ed7921e..91157ad
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -17,12 -17,9 +17,15 @@@
*/
package org.apache.cassandra.db;
+ import java.util.Collections;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.dht.Bounds;
++import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -95,9 -89,51 +98,54 @@@ public class CounterCacheTes
}
@Test
+ public void testCounterCacheInvalidate()
+ {
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+ cfs.truncateBlocking();
+ CacheService.instance.invalidateCounterCache();
+
++ Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
++ Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
++ ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
++
+ assertEquals(0, CacheService.instance.counterCache.size());
- assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
- assertNull(cfs.getCachedCounter(bytes(3), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(3), cellname(2)));
-
- cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
- cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
- cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
- cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
- cfs.putCachedCounter(bytes(3), cellname(1), ClockAndCount.create(3L, 1L));
- cfs.putCachedCounter(bytes(3), cellname(2), ClockAndCount.create(3L, 2L));
-
- assertEquals(6, CacheService.instance.counterCache.size());
- assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
- assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
- assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
- assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
- assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
- assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
-
- cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.partitioner.decorateKey(bytes(1)).getToken(),
- cfs.partitioner.decorateKey(bytes(2)).getToken())));
++ assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(3), c1, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(3), c2, cd, null));
++
++ cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L));
++ cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L));
++ cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L));
++ cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L));
++ cfs.putCachedCounter(bytes(3), c1, cd, null, ClockAndCount.create(3L, 1L));
++ cfs.putCachedCounter(bytes(3), c2, cd, null, ClockAndCount.create(3L, 2L));
++
++ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null));
++ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null));
++ assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null));
++ assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null));
++ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd, null));
++ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd, null));
++
++ cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.decorateKey(bytes(1)).getToken(),
++ cfs.decorateKey(bytes(2)).getToken())));
+
+ assertEquals(2, CacheService.instance.counterCache.size());
- assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
- assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
- assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
++ assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
++ assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
++ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd, null));
++ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd, null));
+ }
+
+ @Test
public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
cfs.truncateBlocking();
CacheService.instance.invalidateCounterCache();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index d407f7a,9fb322b..b157adc
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -20,22 -20,28 +20,27 @@@ package org.apache.cassandra.db
import java.net.InetAddress;
import java.nio.ByteBuffer;
+ import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Iterator;
+ import java.util.TreeSet;
+ import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cache.RowCacheKey;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.partitions.CachedPartition;
+ import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.locator.TokenMetadata;
@@@ -230,6 -178,51 +235,51 @@@ public class RowCacheTes
}
@Test
+ public void testInvalidateRowCache() throws Exception
+ {
+ StorageService.instance.initServer(0);
+ CacheService.instance.setRowCacheCapacityInMB(1);
+ rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED);
+ assertEquals(CacheService.instance.rowCache.size(), 100);
+
- //construct 5 ranges of 20 elements each
++ //construct 5 bounds of 20 elements each
+ ArrayList<Bounds<Token>> subranges = getBounds(20);
+
- //invalidate 3 of the 5 ranges
++ //invalidate 3 of the 5 bounds
+ ArrayList<Bounds<Token>> boundsToInvalidate = Lists.newArrayList(subranges.get(0), subranges.get(2), subranges.get(4));
+ int invalidatedKeys = store.invalidateRowCache(boundsToInvalidate);
+ assertEquals(60, invalidatedKeys);
+
+ //now there should be only 40 cached entries left
+ assertEquals(CacheService.instance.rowCache.size(), 40);
+ CacheService.instance.setRowCacheCapacityInMB(0);
+ }
+
+ private ArrayList<Bounds<Token>> getBounds(int nElements)
+ {
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED);
+ TreeSet<DecoratedKey> orderedKeys = new TreeSet<>();
+
+ for(Iterator<RowCacheKey> it = CacheService.instance.rowCache.keyIterator();it.hasNext();)
- orderedKeys.add(store.partitioner.decorateKey(ByteBuffer.wrap(it.next().key)));
++ orderedKeys.add(store.decorateKey(ByteBuffer.wrap(it.next().key)));
+
+ ArrayList<Bounds<Token>> boundsToInvalidate = new ArrayList<>();
+ Iterator<DecoratedKey> iterator = orderedKeys.iterator();
+
+ while (iterator.hasNext())
+ {
+ Token startRange = iterator.next().getToken();
+ for (int i = 0; i < nElements-2; i++)
+ iterator.next();
+ Token endRange = iterator.next().getToken();
+ boundsToInvalidate.add(new Bounds<>(startRange, endRange));
+ }
+ return boundsToInvalidate;
+ }
+
+ @Test
public void testRowCachePartialLoad() throws Exception
{
CacheService.instance.setRowCacheCapacityInMB(1);