You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/05 01:22:04 UTC
[1/2] git commit: Make cache serializers pluggable
Repository: cassandra
Updated Branches:
refs/heads/trunk 9bf17e15a -> 87def12cb
Make cache serializers pluggable
patch by Blake Eggleston; reviewed by Aleksey Yeschenko for
CASSANDRA-8096
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a0ba84b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a0ba84b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a0ba84b
Branch: refs/heads/trunk
Commit: 0a0ba84b87046a51338021ca56acdba4b40bd69a
Parents: a446e80
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Oct 24 12:16:00 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:16:00 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddbc810..42cef8c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.2
+ * Make cache serializers pluggable (CASSANDRA-8096)
* Fix issues with CONTAINS (KEY) queries on secondary indexes
(CASSANDRA-8147)
* Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 1a861f8..fca939a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -37,10 +37,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.LengthAvailableInputStream;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -48,6 +45,12 @@ import org.apache.cassandra.utils.Pair;
public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
{
+ public interface IStreamFactory
+ {
+ public InputStream getInputStream(File path) throws FileNotFoundException;
+ public OutputStream getOutputStream(File path) throws FileNotFoundException;
+ }
+
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
/** True if a cache flush is currently executing: only one may execute at a time. */
@@ -59,6 +62,25 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
private CacheSerializer<K, V> cacheLoader;
private static final String CURRENT_VERSION = "b";
+ private static volatile IStreamFactory streamFactory = new IStreamFactory()
+ {
+ public InputStream getInputStream(File path) throws FileNotFoundException
+ {
+ return new FileInputStream(path);
+ }
+
+ public OutputStream getOutputStream(File path) throws FileNotFoundException
+ {
+ return new FileOutputStream(path);
+ }
+ };
+
+ // Unused, but exposed for a reason. See CASSANDRA-8096.
+ public static void setStreamFactory(IStreamFactory streamFactory)
+ {
+ AutoSavingCache.streamFactory = streamFactory;
+ }
+
public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
{
super(cacheType.toString(), cache);
@@ -122,7 +144,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
try
{
logger.info(String.format("reading saved cache %s", path));
- in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
+ in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
while (in.available() > 0)
{
@@ -216,7 +238,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, SequentialWriter> writers = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
+ HashMap<UUID, File> paths = new HashMap<>();
try
{
@@ -226,20 +250,32 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- SequentialWriter writer = writers.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
- writer = tempCacheFile(cfId);
+ File writerPath = tempCacheFile(cfId);
+ OutputStream stream;
+ try
+ {
+ stream = streamFactory.getOutputStream(writerPath);
+ writer = new DataOutputStreamPlus(stream);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ paths.put(cfId, writerPath);
+ streams.put(cfId, stream);
writers.put(cfId, writer);
}
try
{
- cacheLoader.serialize(key, writer.stream);
+ cacheLoader.serialize(key, writer);
}
catch (IOException e)
{
- throw new FSWriteError(e, writer.getPath());
+ throw new FSWriteError(e, paths.get(cfId));
}
keysWritten++;
@@ -247,16 +283,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
finally
{
- for (SequentialWriter writer : writers.values())
+ for (OutputStream writer : streams.values())
FileUtils.closeQuietly(writer);
}
- for (Map.Entry<UUID, SequentialWriter> entry : writers.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
- SequentialWriter writer = entry.getValue();
- File tmpFile = new File(writer.getPath());
+ File tmpFile = paths.get(cfId);
File cacheFile = getCachePath(cfId, CURRENT_VERSION);
cacheFile.delete(); // ignore error if it didn't exist
@@ -267,11 +302,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
- private SequentialWriter tempCacheFile(UUID cfId)
+ private File tempCacheFile(UUID cfId)
{
File path = getCachePath(cfId, CURRENT_VERSION);
- File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
- return SequentialWriter.open(tmpFile);
+ return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
}
private void deleteOldCacheFiles()
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87def12c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87def12c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87def12c
Branch: refs/heads/trunk
Commit: 87def12cb1802d6bdb562019249065d3fa93a913
Parents: 9bf17e1 0a0ba84
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Nov 5 03:19:27 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:19:27 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87def12c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7901aff,42cef8c..817cbcf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,5 +1,38 @@@
+3.0
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * improve concurrency of repair (CASSANDRA-6455)
+
+
2.1.2
+ * Make cache serializers pluggable (CASSANDRA-8096)
* Fix issues with CONTAINS (KEY) queries on secondary indexes
(CASSANDRA-8147)
* Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87def12c/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------