You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/01/21 20:55:38 UTC
cassandra git commit: Serializing Row cache alternative,
fully off heap
Repository: cassandra
Updated Branches:
refs/heads/trunk 732986bbd -> dfd78d0e9
Serializing Row cache alternative, fully off heap
Patch by Robert Stupp, reviewed by Ariel Weisberg for CASSANDRA-7438
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfd78d0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfd78d0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfd78d0e
Branch: refs/heads/trunk
Commit: dfd78d0e90b9995872b00f6f33afc180f2b7cba1
Parents: 732986b
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jan 21 20:50:35 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jan 21 20:50:35 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NOTICE.txt | 5 +
build.xml | 3 +-
conf/cassandra.yaml | 22 +-
lib/licenses/ohc-0.2.1.txt | 201 ++++++++++++++
lib/ohc-core-0.3.1.jar | Bin 0 -> 143623 bytes
lib/ohc-core-j8-0.3.1.jar | Bin 0 -> 5026 bytes
.../apache/cassandra/cache/AutoSavingCache.java | 42 ++-
.../apache/cassandra/cache/CacheProvider.java | 23 ++
.../cache/ConcurrentLinkedHashCache.java | 12 +-
src/java/org/apache/cassandra/cache/ICache.java | 6 +-
.../cassandra/cache/InstrumentingCache.java | 24 +-
.../cassandra/cache/NopCacheProvider.java | 93 +++++++
.../org/apache/cassandra/cache/OHCProvider.java | 274 +++++++++++++++++++
.../org/apache/cassandra/cache/RowCacheKey.java | 6 +
.../cassandra/cache/SerializingCache.java | 10 +-
.../cache/SerializingCacheProvider.java | 7 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 15 +-
.../config/YamlConfigurationLoader.java | 2 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 12 +-
.../apache/cassandra/service/CacheService.java | 23 +-
test/conf/cassandra.yaml | 2 +
.../org/apache/cassandra/db/KeyCacheTest.java | 9 +-
.../org/apache/cassandra/db/RowCacheTest.java | 14 +-
25 files changed, 731 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc6478e..99a5d51 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
* Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
* Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
* Support direct buffer decompression for reads (CASSANDRA-8464)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 2fe15f5..43d4514 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -66,3 +66,8 @@ Javassist
SIGAR
http://sigar.hyperic.com/
+
+OHC
+(https://github.com/snazy/ohc)
+Java Off-Heap-Cache, licensed under APLv2
+Copyright 2014-2015 Robert Stupp, Germany.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index baf6a77..fccd009 100644
--- a/build.xml
+++ b/build.xml
@@ -375,6 +375,7 @@
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" />
<dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.1" />
<dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" />
<dependency groupId="org.fusesource" artifactId="sigar" version="1.6.4">
@@ -424,10 +425,10 @@
<dependency groupId="org.antlr" artifactId="antlr"/>
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
<dependency groupId="org.javassist" artifactId="javassist"/>
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/>
<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
- <dependency groupId="org.javassist" artifactId="javassist" />
</artifact:pom>
<artifact:pom id="coverage-deps-pom"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 24bab09..e1bfca6 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -181,15 +181,25 @@ key_cache_save_period: 14400
# Disabled by default, meaning all keys are going to be saved
# key_cache_keys_to_save: 100
+# Row cache implementation class name.
+# Available implementations:
+# org.apache.cassandra.cache.OHCProvider Fully off-heap row cache implementation (default).
+# org.apache.cassandra.cache.SerializingCacheProvider This is the row cache implementation availabile
+# in previous releases of Cassandra.
+# row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+
# Maximum size of the row cache in memory.
-# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+# Please note that OHC cache implementation requires some additional off-heap memory to manage
+# the map structures and some in-flight memory during operations before/after cache entries can be
+# accounted against the cache capacity. This overhead is usually small compared to the whole capacity.
+# Do not specify more memory that the system can afford in the worst usual situation and leave some
+# headroom for OS block level cache. Do never allow your system to swap.
#
# Default value is 0, to disable row caching.
row_cache_size_in_mb: 0
-# Duration in seconds after which Cassandra should
-# save the row cache. Caches are saved to saved_caches_directory as specified
-# in this configuration file.
+# Duration in seconds after which Cassandra should save the row cache.
+# Caches are saved to saved_caches_directory as specified in this configuration file.
#
# Saved caches greatly improve cold-start speeds, and is relatively cheap in
# terms of I/O for the key cache. Row cache saving is much more expensive and
@@ -198,8 +208,8 @@ row_cache_size_in_mb: 0
# Default is 0 to disable saving the row cache.
row_cache_save_period: 0
-# Number of keys from the row cache to save
-# Disabled by default, meaning all keys are going to be saved
+# Number of keys from the row cache to save.
+# Specify 0 (which is the default), meaning all keys are going to be saved
# row_cache_keys_to_save: 100
# Maximum size of the counter cache in memory.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/licenses/ohc-0.2.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/ohc-0.2.1.txt b/lib/licenses/ohc-0.2.1.txt
new file mode 100644
index 0000000..eb6b5d3
--- /dev/null
+++ b/lib/licenses/ohc-0.2.1.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/ohc-core-0.3.1.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-0.3.1.jar b/lib/ohc-core-0.3.1.jar
new file mode 100644
index 0000000..a6d2c9c
Binary files /dev/null and b/lib/ohc-core-0.3.1.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/ohc-core-j8-0.3.1.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-j8-0.3.1.jar b/lib/ohc-core-j8-0.3.1.jar
new file mode 100644
index 0000000..efe7682
Binary files /dev/null and b/lib/ohc-core-j8-0.3.1.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 92e6a6d..fd87631 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -59,7 +59,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
protected volatile ScheduledFuture<?> saveTask;
protected final CacheService.CacheType cacheType;
- private CacheSerializer<K, V> cacheLoader;
+ private final CacheSerializer<K, V> cacheLoader;
private static final String CURRENT_VERSION = "b";
private static volatile IStreamFactory streamFactory = new IStreamFactory()
@@ -177,16 +177,24 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
public class Writer extends CompactionInfo.Holder
{
- private final Set<K> keys;
+ private final Iterator<K> keyIterator;
private final CompactionInfo info;
private long keysWritten;
+ private final long keysEstimate;
protected Writer(int keysToSave)
{
- if (keysToSave >= getKeySet().size())
- keys = getKeySet();
+ int size = size();
+ if (keysToSave >= size || keysToSave == 0)
+ {
+ keyIterator = keyIterator();
+ keysEstimate = size;
+ }
else
- keys = hotKeySet(keysToSave);
+ {
+ keyIterator = hotKeyIterator(keysToSave);
+ keysEstimate = keysToSave;
+ }
OperationType type;
if (cacheType == CacheService.CacheType.KEY_CACHE)
@@ -201,7 +209,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
info = new CompactionInfo(CFMetaData.denseCFMetaData(SystemKeyspace.NAME, cacheType.toString(), BytesType.instance),
type,
0,
- keys.size(),
+ keysEstimate,
"keys");
}
@@ -213,7 +221,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
public CompactionInfo getCompactionInfo()
{
// keyset can change in size, thus total can too
- return info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
+ // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
+ return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
}
public void saveCache()
@@ -221,7 +230,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
logger.debug("Deleting old {} files.", cacheType);
deleteOldCacheFiles();
- if (keys.isEmpty())
+ if (!keyIterator.hasNext())
{
logger.debug("Skipping {} save, cache is empty.", cacheType);
return;
@@ -235,8 +244,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
try
{
- for (K key : keys)
+ while (keyIterator.hasNext())
{
+ K key = keyIterator.next();
UUID cfId = key.getCFId();
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
@@ -270,10 +280,22 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
keysWritten++;
+ if (keysWritten >= keysEstimate)
+ break;
}
}
finally
{
+ if (keyIterator instanceof Closeable)
+ try
+ {
+ ((Closeable)keyIterator).close();
+ }
+ catch (IOException ignored)
+ {
+ // not thrown (by OHC)
+ }
+
for (OutputStream writer : streams.values())
FileUtils.closeQuietly(writer);
}
@@ -290,7 +312,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
logger.error("Unable to rename {} to {}", tmpFile, cacheFile);
}
- logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
private File tempCacheFile(UUID cfId)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CacheProvider.java b/src/java/org/apache/cassandra/cache/CacheProvider.java
new file mode 100644
index 0000000..6a97be3
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/CacheProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+public interface CacheProvider<K, V>
+{
+ ICache<K, V> create();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
index 8182447..bb14055 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.cache;
-import java.util.Set;
+import java.util.Iterator;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
@@ -45,7 +45,7 @@ public class ConcurrentLinkedHashCache<K extends IMeasurableMemory, V extends IM
.concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
.build();
- return new ConcurrentLinkedHashCache<K, V>(map);
+ return new ConcurrentLinkedHashCache<>(map);
}
public static <K extends IMeasurableMemory, V extends IMeasurableMemory> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity)
@@ -116,14 +116,14 @@ public class ConcurrentLinkedHashCache<K extends IMeasurableMemory, V extends IM
map.remove(key);
}
- public Set<K> keySet()
+ public Iterator<K> keyIterator()
{
- return map.keySet();
+ return map.keySet().iterator();
}
- public Set<K> hotKeySet(int n)
+ public Iterator<K> hotKeyIterator(int n)
{
- return map.descendingKeySetWithLimit(n);
+ return map.descendingKeySetWithLimit(n).iterator();
}
public boolean containsKey(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/ICache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ICache.java b/src/java/org/apache/cassandra/cache/ICache.java
index 22dbb16..37b55cd 100644
--- a/src/java/org/apache/cassandra/cache/ICache.java
+++ b/src/java/org/apache/cassandra/cache/ICache.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.cache;
-import java.util.Set;
+import java.util.Iterator;
/**
* This is similar to the Map interface, but requires maintaining a given capacity
@@ -46,9 +46,9 @@ public interface ICache<K, V>
public void clear();
- public Set<K> keySet();
+ public Iterator<K> keyIterator();
- public Set<K> hotKeySet(int n);
+ public Iterator<K> hotKeyIterator(int n);
public boolean containsKey(K key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/InstrumentingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
index 311b373..c8728fd 100644
--- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java
+++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.cache;
-import java.util.Set;
+import java.util.Iterator;
import org.apache.cassandra.metrics.CacheMetrics;
@@ -26,7 +26,6 @@ import org.apache.cassandra.metrics.CacheMetrics;
*/
public class InstrumentingCache<K, V>
{
- private volatile boolean capacitySetManually;
private final ICache<K, V> map;
private final String type;
@@ -78,20 +77,9 @@ public class InstrumentingCache<K, V>
return map.capacity();
}
- public boolean isCapacitySetManually()
- {
- return capacitySetManually;
- }
-
- public void updateCapacity(long capacity)
- {
- map.setCapacity(capacity);
- }
-
public void setCapacity(long capacity)
{
- updateCapacity(capacity);
- capacitySetManually = true;
+ map.setCapacity(capacity);
}
public int size()
@@ -110,14 +98,14 @@ public class InstrumentingCache<K, V>
metrics = new CacheMetrics(type, map);
}
- public Set<K> getKeySet()
+ public Iterator<K> keyIterator()
{
- return map.keySet();
+ return map.keyIterator();
}
- public Set<K> hotKeySet(int n)
+ public Iterator<K> hotKeyIterator(int n)
{
- return map.hotKeySet(n);
+ return map.hotKeyIterator(n);
}
public boolean containsKey(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/NopCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/NopCacheProvider.java b/src/java/org/apache/cassandra/cache/NopCacheProvider.java
new file mode 100644
index 0000000..20f837a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/NopCacheProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public class NopCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
+{
+ public ICache<RowCacheKey, IRowCacheEntry> create()
+ {
+ return new NopCache();
+ }
+
+ private static class NopCache implements ICache<RowCacheKey, IRowCacheEntry>
+ {
+ public long capacity()
+ {
+ return 0;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ }
+
+ public void put(RowCacheKey key, IRowCacheEntry value)
+ {
+ }
+
+ public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
+ {
+ return false;
+ }
+
+ public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value)
+ {
+ return false;
+ }
+
+ public IRowCacheEntry get(RowCacheKey key)
+ {
+ return null;
+ }
+
+ public void remove(RowCacheKey key)
+ {
+ }
+
+ public int size()
+ {
+ return 0;
+ }
+
+ public long weightedSize()
+ {
+ return 0;
+ }
+
+ public void clear()
+ {
+ }
+
+ public Iterator<RowCacheKey> hotKeyIterator(int n)
+ {
+ return Collections.emptyIterator();
+ }
+
+ public Iterator<RowCacheKey> keyIterator()
+ {
+ return Collections.emptyIterator();
+ }
+
+ public boolean containsKey(RowCacheKey key)
+ {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
new file mode 100644
index 0000000..365ca41
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.net.MessagingService;
+import org.caffinitas.ohc.OHCache;
+import org.caffinitas.ohc.OHCacheBuilder;
+
+public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
+{
+ public ICache<RowCacheKey, IRowCacheEntry> create()
+ {
+ OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder();
+ builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024)
+ .keySerializer(new KeySerializer())
+ .valueSerializer(new ValueSerializer())
+ .throwOOME(true);
+
+ return new OHCacheAdapter(builder.build());
+ }
+
+ private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry>
+ {
+ private final OHCache<RowCacheKey, IRowCacheEntry> ohCache;
+
+ public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache)
+ {
+ this.ohCache = ohCache;
+ }
+
+ public long capacity()
+ {
+ return ohCache.capacity();
+ }
+
+ public void setCapacity(long capacity)
+ {
+ ohCache.setCapacity(capacity);
+ }
+
+ public void put(RowCacheKey key, IRowCacheEntry value)
+ {
+ ohCache.put(key, value);
+ }
+
+ public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
+ {
+ return ohCache.putIfAbsent(key, value);
+ }
+
+ public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value)
+ {
+ return ohCache.addOrReplace(key, old, value);
+ }
+
+ public IRowCacheEntry get(RowCacheKey key)
+ {
+ return ohCache.get(key);
+ }
+
+ public void remove(RowCacheKey key)
+ {
+ ohCache.remove(key);
+ }
+
+ public int size()
+ {
+ return (int) ohCache.size();
+ }
+
+ public long weightedSize()
+ {
+ return ohCache.size();
+ }
+
+ public void clear()
+ {
+ ohCache.clear();
+ }
+
+ public Iterator<RowCacheKey> hotKeyIterator(int n)
+ {
+ return ohCache.hotKeyIterator(n);
+ }
+
+ public Iterator<RowCacheKey> keyIterator()
+ {
+ return ohCache.keyIterator();
+ }
+
+ public boolean containsKey(RowCacheKey key)
+ {
+ return ohCache.containsKey(key);
+ }
+ }
+
+ private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
+ {
+ public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
+ {
+ dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits());
+ dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits());
+ dataOutput.writeInt(rowCacheKey.key.length);
+ dataOutput.write(rowCacheKey.key);
+ }
+
+ public RowCacheKey deserialize(DataInput dataInput) throws IOException
+ {
+ long msb = dataInput.readLong();
+ long lsb = dataInput.readLong();
+ byte[] key = new byte[dataInput.readInt()];
+ dataInput.readFully(key);
+ return new RowCacheKey(new UUID(msb, lsb), key);
+ }
+
+ public int serializedSize(RowCacheKey rowCacheKey)
+ {
+ return 20 + rowCacheKey.key.length;
+ }
+ }
+
+ private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry>
+ {
+ public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
+ {
+ assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
+ boolean isSentinel = entry instanceof RowCacheSentinel;
+ out.writeBoolean(isSentinel);
+ if (isSentinel)
+ out.writeLong(((RowCacheSentinel) entry).sentinelId);
+ else
+ ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version);
+ }
+
+ public IRowCacheEntry deserialize(DataInput in) throws IOException
+ {
+ boolean isSentinel = in.readBoolean();
+ if (isSentinel)
+ return new RowCacheSentinel(in.readLong());
+ return ColumnFamily.serializer.deserialize(in, MessagingService.current_version);
+ }
+
+ public int serializedSize(IRowCacheEntry entry)
+ {
+ TypeSizes typeSizes = TypeSizes.NATIVE;
+ int size = typeSizes.sizeof(true);
+ if (entry instanceof RowCacheSentinel)
+ size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
+ else
+ size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version);
+ return size;
+ }
+ }
+
+ static class DataOutputPlusAdapter implements DataOutputPlus
+ {
+ private final DataOutput out;
+
+ public void write(byte[] b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ out.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void writeBoolean(boolean v) throws IOException
+ {
+ out.writeBoolean(v);
+ }
+
+ public void writeByte(int v) throws IOException
+ {
+ out.writeByte(v);
+ }
+
+ public void writeBytes(String s) throws IOException
+ {
+ out.writeBytes(s);
+ }
+
+ public void writeChar(int v) throws IOException
+ {
+ out.writeChar(v);
+ }
+
+ public void writeChars(String s) throws IOException
+ {
+ out.writeChars(s);
+ }
+
+ public void writeDouble(double v) throws IOException
+ {
+ out.writeDouble(v);
+ }
+
+ public void writeFloat(float v) throws IOException
+ {
+ out.writeFloat(v);
+ }
+
+ public void writeInt(int v) throws IOException
+ {
+ out.writeInt(v);
+ }
+
+ public void writeLong(long v) throws IOException
+ {
+ out.writeLong(v);
+ }
+
+ public void writeShort(int v) throws IOException
+ {
+ out.writeShort(v);
+ }
+
+ public void writeUTF(String s) throws IOException
+ {
+ out.writeUTF(s);
+ }
+
+ public DataOutputPlusAdapter(DataOutput out)
+ {
+ this.out = out;
+ }
+
+ public void write(ByteBuffer buffer) throws IOException
+ {
+ if (buffer.hasArray())
+ out.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+ else
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
+
+ public void write(Memory memory) throws IOException
+ {
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index af2d4d4..ccb85d8 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -33,6 +33,12 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ public RowCacheKey(UUID cfId, byte[] key)
+ {
+ this.cfId = cfId;
+ this.key = key;
+ }
+
public RowCacheKey(UUID cfId, DecoratedKey key)
{
this(cfId, key.getKey());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index ca65fcc..911b500 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.cache;
import java.io.IOException;
-import java.util.Set;
+import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,14 +264,14 @@ public class SerializingCache<K, V> implements ICache<K, V>
mem.unreference();
}
- public Set<K> keySet()
+ public Iterator<K> keyIterator()
{
- return map.keySet();
+ return map.keySet().iterator();
}
- public Set<K> hotKeySet(int n)
+ public Iterator<K> hotKeyIterator(int n)
{
- return map.descendingKeySetWithLimit(n);
+ return map.descendingKeySetWithLimit(n).iterator();
}
public boolean containsKey(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index a058872..f540322 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -20,17 +20,18 @@ package org.apache.cassandra.cache;
import java.io.DataInput;
import java.io.IOException;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
-public class SerializingCacheProvider
+public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
{
- public ICache<RowCacheKey, IRowCacheEntry> create(long capacity)
+ public ICache<RowCacheKey, IRowCacheEntry> create()
{
- return SerializingCache.create(capacity, new RowCacheSerializer());
+ return SerializingCache.create(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024, new RowCacheSerializer());
}
// Package protected for tests
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 33d2bb2..f42c980 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
public volatile int key_cache_save_period = 14400;
public volatile int key_cache_keys_to_save = Integer.MAX_VALUE;
+ public String row_cache_class_name = "org.apache.cassandra.cache.OHCProvider";
public long row_cache_size_in_mb = 0;
public volatile int row_cache_save_period = 0;
public volatile int row_cache_keys_to_save = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6d626da..8cc2da4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1423,6 +1423,11 @@ public class DatabaseDescriptor
conf.key_cache_keys_to_save = keyCacheKeysToSave;
}
+ public static String getRowCacheClassName()
+ {
+ return conf.row_cache_class_name;
+ }
+
public static long getRowCacheSizeInMB()
{
return conf.row_cache_size_in_mb;
@@ -1448,6 +1453,11 @@ public class DatabaseDescriptor
return counterCacheSizeInMB;
}
+ public static void setRowCacheKeysToSave(int rowCacheKeysToSave)
+ {
+ conf.row_cache_keys_to_save = rowCacheKeysToSave;
+ }
+
public static int getCounterCacheSavePeriod()
{
return conf.counter_cache_save_period;
@@ -1473,11 +1483,6 @@ public class DatabaseDescriptor
return memoryAllocator;
}
- public static void setRowCacheKeysToSave(int rowCacheKeysToSave)
- {
- conf.row_cache_keys_to_save = rowCacheKeysToSave;
- }
-
public static int getStreamingSocketTimeout()
{
return conf.streaming_socket_timeout_in_ms;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 50991f2..82c8151 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -102,7 +102,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
}
logConfig(configBytes);
-
+
org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
seedDesc.putMapPropertyType("parameters", String.class, String.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 002238c..c2ee0ac 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1961,19 +1961,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
- for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
+ keyIter.hasNext(); )
{
+ RowCacheKey key = keyIter.next();
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
- if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+ if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
invalidateCachedRow(dk);
}
if (metadata.isCounter())
{
- for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+ for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
+ keyIter.hasNext(); )
{
+ CounterCacheKey key = keyIter.next();
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
- if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+ if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
CacheService.instance.counterCache.remove(key);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 2ffd954..fb8153c 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -33,6 +33,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.util.concurrent.Futures;
+
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,10 +133,22 @@ public class CacheService implements CacheServiceMBean
{
logger.info("Initializing row cache with capacity of {} MBs", DatabaseDescriptor.getRowCacheSizeInMB());
- long rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024;
+ CacheProvider<RowCacheKey, IRowCacheEntry> cacheProvider;
+ String cacheProviderClassName = DatabaseDescriptor.getRowCacheSizeInMB() > 0
+ ? DatabaseDescriptor.getRowCacheClassName() : "org.apache.cassandra.cache.NopCacheProvider";
+ try
+ {
+ Class<CacheProvider<RowCacheKey, IRowCacheEntry>> cacheProviderClass =
+ (Class<CacheProvider<RowCacheKey, IRowCacheEntry>>) Class.forName(cacheProviderClassName);
+ cacheProvider = cacheProviderClass.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Cannot find configured row cache provider class " + DatabaseDescriptor.getRowCacheClassName());
+ }
// cache object
- ICache<RowCacheKey, IRowCacheEntry> rc = new SerializingCacheProvider().create(rowCacheInMemoryCapacity);
+ ICache<RowCacheKey, IRowCacheEntry> rc = cacheProvider.create();
AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
@@ -285,7 +298,7 @@ public class CacheService implements CacheServiceMBean
public void invalidateKeyCacheForCf(UUID cfId)
{
- Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator();
+ Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator();
while (keyCacheIterator.hasNext())
{
KeyCacheKey key = keyCacheIterator.next();
@@ -301,7 +314,7 @@ public class CacheService implements CacheServiceMBean
public void invalidateRowCacheForCf(UUID cfId)
{
- Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator();
+ Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator();
while (rowCacheIterator.hasNext())
{
RowCacheKey rowCacheKey = rowCacheIterator.next();
@@ -312,7 +325,7 @@ public class CacheService implements CacheServiceMBean
public void invalidateCounterCacheForCf(UUID cfId)
{
- Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator();
+ Iterator<CounterCacheKey> counterCacheIterator = counterCache.keyIterator();
while (counterCacheIterator.hasNext())
{
CounterCacheKey counterCacheKey = counterCacheIterator.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ec988e2..307ca8c 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -36,3 +36,5 @@ server_encryption_options:
incremental_backups: true
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 0
+row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+row_cache_size_in_mb: 16
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index e31b439..c370e4f 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -88,8 +89,10 @@ public class KeyCacheTest
// really? our caches don't implement the map interface? (hence no .addAll)
Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, RowIndexEntry>();
- for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet())
+ for (Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator();
+ iter.hasNext();)
{
+ KeyCacheKey k = iter.next();
if (k.desc.ksname.equals(KEYSPACE1) && k.desc.cfname.equals(COLUMN_FAMILY2))
savedMap.put(k, CacheService.instance.keyCache.get(k));
}
@@ -207,8 +210,10 @@ public class KeyCacheTest
private void assertKeyCacheSize(int expected, String keyspace, String columnFamily)
{
int size = 0;
- for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet())
+ for (Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator();
+ iter.hasNext();)
{
+ KeyCacheKey k = iter.next();
if (k.desc.ksname.equals(keyspace) && k.desc.cfname.equals(columnFamily))
size++;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 7d5799a..3d5617f 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -156,9 +156,9 @@ public class RowCacheTest
rowCacheLoad(100, Integer.MAX_VALUE, 1000);
ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED);
- assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+ assertEquals(CacheService.instance.rowCache.size(), 100);
store.cleanupCache();
- assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+ assertEquals(CacheService.instance.rowCache.size(), 100);
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
byte[] tk1, tk2;
tk1 = "key1000".getBytes();
@@ -166,7 +166,7 @@ public class RowCacheTest
tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
store.cleanupCache();
- assertEquals(CacheService.instance.rowCache.getKeySet().size(), 50);
+ assertEquals(50, CacheService.instance.rowCache.size());
CacheService.instance.setRowCacheCapacityInMB(0);
}
@@ -259,19 +259,19 @@ public class RowCacheTest
// empty the cache
CacheService.instance.invalidateRowCache();
- assert CacheService.instance.rowCache.size() == 0;
+ assertEquals(0, CacheService.instance.rowCache.size());
// insert data and fill the cache
SchemaLoader.insertData(KEYSPACE_CACHED, CF_CACHED, offset, totalKeys);
SchemaLoader.readData(KEYSPACE_CACHED, CF_CACHED, offset, totalKeys);
- assert CacheService.instance.rowCache.size() == totalKeys;
+ assertEquals(totalKeys, CacheService.instance.rowCache.size());
// force the cache to disk
CacheService.instance.rowCache.submitWrite(keysToSave).get();
// empty the cache again to make sure values came from disk
CacheService.instance.invalidateRowCache();
- assert CacheService.instance.rowCache.size() == 0;
- assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave);
+ assertEquals(0, CacheService.instance.rowCache.size());
+ assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store));
}
}