You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/16 22:05:01 UTC
[8/9] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a48466a,a13a52d..a3e7d12
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -21,9 -22,9 +21,8 @@@ import java.io.IOException
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
- import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@@ -363,48 -360,33 +364,53 @@@ public class CacheService implements Ca
ByteBufferUtil.writeWithLength(key.cellName, out);
}
- public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
+ public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
{
+ //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+ //parameter so they aren't deserialized here, even though they are serialized by this serializer
final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
- ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in);
+ final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
+ if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
+ return null;
+ assert(cfs.metadata.isCounter());
- final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer);
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
{
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
- DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
- QueryFilter filter = QueryFilter.getNamesFilter(key,
- cfs.metadata.cfName,
- FBUtilities.singleton(cellName, cfs.metadata.comparator),
- Long.MIN_VALUE);
- ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
- if (cf == null)
- return null;
- Cell cell = cf.getColumn(cellName);
- if (cell == null || !cell.isLive(Long.MIN_VALUE))
- return null;
- ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
- return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
+ DecoratedKey key = cfs.decorateKey(partitionKey);
+ LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
+ ColumnDefinition column = name.column;
+ CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
+
+ int nowInSec = FBUtilities.nowInSeconds();
+ ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
+ if (path == null)
+ builder.add(column);
+ else
+ builder.select(column, path);
+
+ ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false);
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter);
+ try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec))
+ {
+ Cell cell;
+ if (column.isStatic())
+ {
+ cell = iter.staticRow().getCell(column);
+ }
+ else
+ {
+ if (!iter.hasNext())
+ return null;
+ cell = iter.next().getCell(column);
+ }
+
+ if (cell == null)
+ return null;
+
+ ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
- return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, name.clustering, column, path), clockAndCount);
++ return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, name.clustering, column, path), clockAndCount);
+ }
}
});
}
@@@ -412,27 -394,30 +418,34 @@@
public static class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
{
- public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException
+ public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
{
- assert(!cfs.isIndex());
++ assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes
+ out.write(cfs.metadata.ksAndCFBytes);
ByteBufferUtil.writeWithLength(key.key, out);
}
- public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
+ public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
{
+ //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+ //parameter so they aren't deserialized here, even though they are serialized by this serializer
final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+ final int rowsToCache = cfs.metadata.params.caching.rowsPerPartitionToCache();
+ if (cfs == null || !cfs.isRowCacheEnabled())
+ return null;
- assert(!cfs.isIndex());
++ assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
{
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
- DecoratedKey key = cfs.partitioner.decorateKey(buffer);
- QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
- ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
- return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
+ DecoratedKey key = cfs.decorateKey(buffer);
+ int nowInSec = FBUtilities.nowInSeconds();
+ try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
+ {
+ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
- return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache);
++ return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
+ }
}
});
}
@@@ -453,11 -435,13 +463,13 @@@
ByteBufferUtil.writeWithLength(key.key, out);
out.writeInt(key.desc.generation);
out.writeBoolean(true);
- key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out);
- key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out);
++ key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, SerializationHeader.forKeyCache(cfs.metadata)).serialize(entry, out);
}
- public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
+ public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
{
+ //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+ //parameter so they aren't deserialized here, even though they are serialized by this serializer
int keyLength = input.readInt();
if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT)
{
@@@ -466,25 -450,18 +478,25 @@@
}
ByteBuffer key = ByteBufferUtil.read(input, keyLength);
int generation = input.readInt();
- SSTableReader reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL));
input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
- if (reader == null)
+ SSTableReader reader = null;
- if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null)
++ if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
{
- RowIndexEntry.Serializer.skip(input);
+ // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be
+ // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
+ // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this
+ // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
- RowIndexEntry.Serializer.skipPromotedIndex(input, BigFormat.instance.getLatestVersion());
++ RowIndexEntry.Serializer.skip(input, BigFormat.instance.getLatestVersion());
return null;
}
- RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version);
+ RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
+ reader.descriptor.version,
+ SerializationHeader.forKeyCache(cfs.metadata));
+ RowIndexEntry entry = indexSerializer.deserialize(input);
- return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+ return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
}
- private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)
+ private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection)
{
for (SSTableReader sstable : collection)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 1408a70,075c8f7..f9ee9e8
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -26,8 -26,11 +26,10 @@@ import java.net.UnknownHostException
import java.rmi.registry.LocateRegistry;
import java.rmi.server.RMIServerSocketFactory;
import java.util.Collections;
+ import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
@@@ -40,8 -42,10 +42,10 @@@ import com.codahale.metrics.Meter
import com.codahale.metrics.MetricRegistryListener;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.metrics.DefaultNameFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index f0ad46f,fa370dc..5e4ef39
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2652,15 -2624,15 +2659,15 @@@ public class StorageService extends Not
/**
* Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
- *
- *
+ *
+ *
* @param tag
* the tag given to the snapshot; may not be null or empty
- * @param columnFamilyList
- * list of columnfamily from different keyspace in the form of ks1.cf1 ks2.cf2
+ * @param tableList
+ * list of tables from different keyspace in the form of ks1.cf1 ks2.cf2
*/
@Override
- public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList)
+ public void takeMultipleTableSnapshot(String tag, String... tableList)
throws IOException
{
Map<Keyspace, List<String>> keyspaceColumnfamily = new HashMap<Keyspace, List<String>>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/FBUtilities.java
index c4b4193,a16fa13..83b07e0
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@@ -775,29 -793,19 +775,45 @@@ public class FBUtilitie
digest.update((byte) ((val >>> 0) & 0xFF));
}
+ public static void updateWithBoolean(MessageDigest digest, boolean val)
+ {
+ updateWithByte(digest, val ? 0 : 1);
+ }
+
+ public static void closeAll(List<? extends AutoCloseable> l) throws Exception
+ {
+ Exception toThrow = null;
+ for (AutoCloseable c : l)
+ {
+ try
+ {
+ c.close();
+ }
+ catch (Exception e)
+ {
+ if (toThrow == null)
+ toThrow = e;
+ else
+ toThrow.addSuppressed(e);
+ }
+ }
+ if (toThrow != null)
+ throw toThrow;
+ }
++
+ public static byte[] toWriteUTFBytes(String s)
+ {
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeUTF(s);
+ dos.flush();
+ return baos.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index c4157ea,475e436..0c7e8a5
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@@ -78,9 -76,8 +78,8 @@@ public class AutoSavingCacheTes
Assert.assertEquals(0, keyCache.size());
// then load saved
- keyCache.loadSaved(cfs);
- Assert.assertEquals(2, keyCache.size());
+ keyCache.loadSavedAsync().get();
- for (SSTableReader sstable : cfs.getSSTables())
+ for (SSTableReader sstable : cfs.getLiveSSTables())
- Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
+ Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 21a41c4,bfcfa59..cd52d35
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@@ -18,27 -19,27 +18,33 @@@
*/
package org.apache.cassandra.cache;
-
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
- import java.util.*;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.*;
+
++
++import java.util.ArrayList;
++import java.util.List;
++
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.ColumnFamily;
+ import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.utils.Pair;
+
import com.googlecode.concurrentlinkedhashmap.Weighers;
-import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.partitions.*;
- import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
public class CacheProviderTest
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index 9fef63f,1a60d6d..861e840
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@@ -26,15 -25,19 +26,20 @@@ import java.util.List
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.collect.ImmutableSet;
-
+ import org.apache.cassandra.cache.KeyCacheKey;
+ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.metrics.CacheMetrics;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.assertNull;
++import org.apache.cassandra.utils.Pair;
++
public class KeyCacheCqlTest extends CQLTester
{
@@@ -167,6 -71,49 +172,47 @@@
insertData(table, "some_index", true);
clearCache();
+ CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+ for (int i = 0; i < 10; i++)
+ {
+ UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+ assertEquals(500, result.size());
+ }
+
+ long hits = metrics.hits.getCount();
+ long requests = metrics.requests.getCount();
- assertEquals(4900, hits);
- assertEquals(5250, requests);
-
- //
++ assertEquals(0, hits);
++ assertEquals(210, requests);
+
+ for (int i = 0; i < 10; i++)
+ {
+ UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+ // 100 part-keys * 50 clust-keys
+ // indexed on part-key % 10 = 10 index partitions
+ // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500
+ assertEquals(500, result.size());
+ }
+
+ metrics = CacheService.instance.keyCache.getMetrics();
+ hits = metrics.hits.getCount();
+ requests = metrics.requests.getCount();
- assertEquals(10000, hits);
- assertEquals(10500, requests);
++ assertEquals(200, hits);
++ assertEquals(420, requests);
+
+ CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+ int beforeSize = CacheService.instance.keyCache.size();
+
+ CacheService.instance.keyCache.clear();
+
+ Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+ // then load saved
+ CacheService.instance.keyCache.loadSaved();
+
+ assertEquals(beforeSize, CacheService.instance.keyCache.size());
+
for (int i = 0; i < 10; i++)
{
UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
@@@ -173,11 -123,44 +222,44 @@@
assertEquals(500, result.size());
}
+ //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths
+ //are defensive and unreachable
+ assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar")));
+ assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar")));
+
+ dropTable("DROP TABLE %s");
+
+ //Test loading for a dropped 2i/table
+ CacheService.instance.keyCache.clear();
+
+ // then load saved
+ CacheService.instance.keyCache.loadSaved();
+
+ assertEquals(0, CacheService.instance.keyCache.size());
+ }
+
+ @Test
+ public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s ("
+ + commonColumnsDef
+ + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+ createIndex("CREATE INDEX some_index ON %s (col_int)");
+ insertData(table, "some_index", true);
+ clearCache();
+
CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+ for (int i = 0; i < 10; i++)
+ {
+ UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+ assertEquals(500, result.size());
+ }
+
long hits = metrics.hits.getCount();
long requests = metrics.requests.getCount();
- assertEquals(4900, hits);
- assertEquals(5250, requests);
+ assertEquals(0, hits);
+ assertEquals(210, requests);
//
@@@ -193,110 -176,29 +275,129 @@@
metrics = CacheService.instance.keyCache.getMetrics();
hits = metrics.hits.getCount();
requests = metrics.requests.getCount();
- assertEquals(10000, hits);
- assertEquals(10500, requests);
+ assertEquals(200, hits);
+ assertEquals(420, requests);
+
+ dropTable("DROP TABLE %s");
+
+ CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+ CacheService.instance.keyCache.clear();
+
+ Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+ // then load saved
+ CacheService.instance.keyCache.loadSaved();
+
+ Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator();
+ while(iter.hasNext())
+ {
+ KeyCacheKey key = iter.next();
+ Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+ Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+ }
}
+ @Test
+ public void testKeyCacheNonClustered() throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s ("
+ + commonColumnsDef
+ + "PRIMARY KEY ((part_key_a, part_key_b)))");
+ insertData(table, null, false);
+ clearCache();
+
+ for (int i = 0; i < 10; i++)
+ {
+ assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)),
+ new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) });
+ }
+
+ CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ long hits = metrics.hits.getCount();
+ long requests = metrics.requests.getCount();
+ assertEquals(0, hits);
+ assertEquals(10, requests);
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)),
+ new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) });
+ }
+
+ hits = metrics.hits.getCount();
+ requests = metrics.requests.getCount();
+ assertEquals(10, hits);
+ assertEquals(120, requests);
+ }
+
+ @Test
+ public void testKeyCacheClustered() throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s ("
+ + commonColumnsDef
+ + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+ insertData(table, null, true);
+ clearCache();
+
+ // query on partition key
+
+ // 10 queries, each 50 result rows
+ for (int i = 0; i < 10; i++)
+ {
+ assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size());
+ }
+
+ CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ long hits = metrics.hits.getCount();
+ long requests = metrics.requests.getCount();
+ assertEquals(0, hits);
+ assertEquals(10, requests);
+
+ // 10 queries, each 50 result rows
+ for (int i = 0; i < 10; i++)
+ {
+ assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size());
+ }
+
+ metrics = CacheService.instance.keyCache.getMetrics();
+ hits = metrics.hits.getCount();
+ requests = metrics.requests.getCount();
+ assertEquals(10, hits);
+ assertEquals(10 + 10, requests);
+
+ // 100 queries - must get a hit in key-cache
+ for (int i = 0; i < 10; i++)
+ {
+ for (int c = 0; c < 10; c++)
+ {
+ assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c),
+ new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c });
+ }
+ }
+
+ metrics = CacheService.instance.keyCache.getMetrics();
+ hits = metrics.hits.getCount();
+ requests = metrics.requests.getCount();
+ assertEquals(10 + 100, hits);
+ assertEquals(20 + 100, requests);
+
+ // 5000 queries - first 10 partitions already in key cache
+ for (int i = 0; i < 100; i++)
+ {
+ for (int c = 0; c < 50; c++)
+ {
+ assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c),
+ new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c });
+ }
+ }
+
+ hits = metrics.hits.getCount();
+ requests = metrics.requests.getCount();
+ assertEquals(110 + 4910, hits);
+ assertEquals(120 + 5500, requests);
+ }
+
// Inserts 100 partitions split over 10 sstables (flush after 10 partitions).
// Clustered tables receive 50 CQL rows per partition.
private void insertData(String table, String index, boolean withClustering) throws Throwable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 517bce6,5b37b2c..65ec420
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -28,10 -24,14 +28,11 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@@ -68,42 -63,40 +69,44 @@@ public class CounterCacheTes
@Test
public void testReadWrite()
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+ cfs.truncateBlocking();
CacheService.instance.invalidateCounterCache();
- assertEquals(0, CacheService.instance.counterCache.size());
- assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
- assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+ Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
+ Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
+ ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
- cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
- cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
- cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
- cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
-
- assertEquals(4, CacheService.instance.counterCache.size());
- assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
- assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
- assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
- assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+ assertEquals(0, CacheService.instance.counterCache.size());
+ assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
+ assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
+ assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
+ assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
+
+ cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L));
+ cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L));
+ cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L));
+ cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L));
+
+ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null));
+ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null));
+ assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null));
+ assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null));
}
@Test
public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+ cfs.truncateBlocking();
CacheService.instance.invalidateCounterCache();
- ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
- cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
- new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
- new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
+ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
+ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
+ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+
+ assertEquals(4, CacheService.instance.counterCache.size());
// flush the counter cache and invalidate
CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
@@@ -111,16 -104,76 +114,79 @@@
assertEquals(0, CacheService.instance.counterCache.size());
// load from cache and validate
- CacheService.instance.counterCache.loadSaved(cfs);
+ CacheService.instance.counterCache.loadSaved();
assertEquals(4, CacheService.instance.counterCache.size());
- assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
- assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
- assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
- assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+
+ Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
+ Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
+ ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
+
+ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null));
+ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null));
+ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null));
+ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null));
}
+
+ @Test
+ public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+ {
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+ cfs.truncateBlocking();
+ CacheService.instance.invalidateCounterCache();
+
- ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
- cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
- new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
- new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+
+ // flush the counter cache and invalidate
+ CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+ CacheService.instance.invalidateCounterCache();
+ assertEquals(0, CacheService.instance.counterCache.size());
+
+ Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1);
+
+ try
+ {
+ // load from cache and validate
+ CacheService.instance.counterCache.loadSaved();
+ assertEquals(0, CacheService.instance.counterCache.size());
+ }
+ finally
+ {
+ Schema.instance.storeKeyspaceInstance(ks);
+ }
+ }
+
+ @Test
+ public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+ {
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+ cfs.truncateBlocking();
+ CacheService.instance.invalidateCounterCache();
+
- ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
- cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
- new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
- new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++ new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+
+ // flush the counter cache and invalidate
+ CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+ CacheService.instance.invalidateCounterCache();
+ assertEquals(0, CacheService.instance.counterCache.size());
+
+
+ CacheService.instance.setCounterCacheCapacityInMB(0);
+ try
+ {
+ // load from cache and validate
+ CacheService.instance.counterCache.loadSaved();
+ assertEquals(0, CacheService.instance.counterCache.size());
+ }
+ finally
+ {
+ CacheService.instance.setCounterCacheCapacityInMB(1);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index be22b45,5912d7c..d407f7a
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -71,53 -72,6 +71,53 @@@ public class RowCacheTes
}
@Test
+ public void testRoundTrip() throws Exception
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE_CACHED);
+ String cf = "CachedIntCF";
+ ColumnFamilyStore cachedStore = keyspace.getColumnFamilyStore(cf);
+ long startRowCacheHits = cachedStore.metric.rowCacheHit.getCount();
+ long startRowCacheOutOfRange = cachedStore.metric.rowCacheHitOutOfRange.getCount();
+ // empty the row cache
+ CacheService.instance.invalidateRowCache();
+
+ // set global row cache size to 1 MB
+ CacheService.instance.setRowCacheCapacityInMB(1);
+
+ ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
+ DecoratedKey dk = cachedStore.decorateKey(key);
- RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
++ RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
+
+ RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
+ rub.clustering(String.valueOf(0));
+ rub.add("val", ByteBufferUtil.bytes("val" + 0));
+ rub.build().applyUnsafe();
+
+ // populate row cache, we should not get a row cache hit;
+ Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build());
+ assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
+
+ // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
+ Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build());
+ assertEquals(++startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
+ assertEquals(startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.getCount());
+
+ CachedPartition cachedCf = (CachedPartition)CacheService.instance.rowCache.get(rck);
+ assertEquals(1, cachedCf.rowCount());
+ for (Unfiltered unfiltered : Util.once(cachedCf.unfilteredIterator(ColumnFilter.selection(cachedCf.columns()), Slices.ALL, false)))
+ {
+ Row r = (Row) unfiltered;
+ for (ColumnData c : r)
+ {
+ assertEquals(((Cell)c).value(), ByteBufferUtil.bytes("val" + 0));
+ }
+ }
+ cachedStore.truncateBlocking();
+ }
+
+ @Test
public void testRowCache() throws Exception
{
CompactionManager.instance.disableAutoCompaction();
@@@ -254,21 -231,18 +289,21 @@@
CacheService.instance.setRowCacheCapacityInMB(1);
ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
- DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
+ DecoratedKey dk = cachedStore.decorateKey(key);
- RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
+ RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
- Mutation mutation = new Mutation(KEYSPACE_CACHED, key);
+ String values[] = new String[200];
for (int i = 0; i < 200; i++)
- mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
- mutation.applyUnsafe();
+ {
+ RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
+ rub.clustering(String.valueOf(i));
+ values[i] = "val" + i;
+ rub.add("val", ByteBufferUtil.bytes(values[i]));
+ rub.build().applyUnsafe();
+ }
+ Arrays.sort(values);
// populate row cache, we should not get a row cache hit;
- cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf,
- Composites.EMPTY,
- Composites.EMPTY,
- false, 10, System.currentTimeMillis()));
+ Util.getAll(Util.cmd(cachedStore, dk).withLimit(10).build());
assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
// do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
@@@ -335,19 -309,6 +370,19 @@@
// empty the cache again to make sure values came from disk
CacheService.instance.invalidateRowCache();
assertEquals(0, CacheService.instance.rowCache.size());
- assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store));
+ assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved());
}
+
+ private static void readData(String keyspace, String columnFamily, int offset, int numberOfRows)
+ {
+ ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
+ CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+
+ for (int i = offset; i < offset + numberOfRows; i++)
+ {
+ DecoratedKey key = Util.dk("key" + i);
+ Clustering cl = new Clustering(ByteBufferUtil.bytes("col" + i));
+ Util.getAll(Util.cmd(store, key).build());
+ }
+ }
}