You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/05 01:16:56 UTC
git commit: Make cache serializers pluggable
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 a446e80e0 -> 0a0ba84b8
Make cache serializers pluggable
patch by Blake Eggleston; reviewed by Aleksey Yeschenko for
CASSANDRA-8096
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a0ba84b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a0ba84b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a0ba84b
Branch: refs/heads/cassandra-2.1
Commit: 0a0ba84b87046a51338021ca56acdba4b40bd69a
Parents: a446e80
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Oct 24 12:16:00 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:16:00 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddbc810..42cef8c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.2
+ * Make cache serializers pluggable (CASSANDRA-8096)
* Fix issues with CONTAINS (KEY) queries on secondary indexes
(CASSANDRA-8147)
* Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 1a861f8..fca939a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -37,10 +37,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.LengthAvailableInputStream;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -48,6 +45,12 @@ import org.apache.cassandra.utils.Pair;
public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
{
+ public interface IStreamFactory
+ {
+ public InputStream getInputStream(File path) throws FileNotFoundException;
+ public OutputStream getOutputStream(File path) throws FileNotFoundException;
+ }
+
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
/** True if a cache flush is currently executing: only one may execute at a time. */
@@ -59,6 +62,25 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
private CacheSerializer<K, V> cacheLoader;
private static final String CURRENT_VERSION = "b";
+ private static volatile IStreamFactory streamFactory = new IStreamFactory()
+ {
+ public InputStream getInputStream(File path) throws FileNotFoundException
+ {
+ return new FileInputStream(path);
+ }
+
+ public OutputStream getOutputStream(File path) throws FileNotFoundException
+ {
+ return new FileOutputStream(path);
+ }
+ };
+
+ // Unused, but exposed for a reason. See CASSANDRA-8096.
+ public static void setStreamFactory(IStreamFactory streamFactory)
+ {
+ AutoSavingCache.streamFactory = streamFactory;
+ }
+
public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
{
super(cacheType.toString(), cache);
@@ -122,7 +144,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
try
{
logger.info(String.format("reading saved cache %s", path));
- in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
+ in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
while (in.available() > 0)
{
@@ -216,7 +238,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, SequentialWriter> writers = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
+ HashMap<UUID, File> paths = new HashMap<>();
try
{
@@ -226,20 +250,32 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- SequentialWriter writer = writers.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
- writer = tempCacheFile(cfId);
+ File writerPath = tempCacheFile(cfId);
+ OutputStream stream;
+ try
+ {
+ stream = streamFactory.getOutputStream(writerPath);
+ writer = new DataOutputStreamPlus(stream);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ paths.put(cfId, writerPath);
+ streams.put(cfId, stream);
writers.put(cfId, writer);
}
try
{
- cacheLoader.serialize(key, writer.stream);
+ cacheLoader.serialize(key, writer);
}
catch (IOException e)
{
- throw new FSWriteError(e, writer.getPath());
+ throw new FSWriteError(e, paths.get(cfId));
}
keysWritten++;
@@ -247,16 +283,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
}
finally
{
- for (SequentialWriter writer : writers.values())
+ for (OutputStream writer : streams.values())
FileUtils.closeQuietly(writer);
}
- for (Map.Entry<UUID, SequentialWriter> entry : writers.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
- SequentialWriter writer = entry.getValue();
- File tmpFile = new File(writer.getPath());
+ File tmpFile = paths.get(cfId);
File cacheFile = getCachePath(cfId, CURRENT_VERSION);
cacheFile.delete(); // ignore error if it didn't exist
@@ -267,11 +302,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
- private SequentialWriter tempCacheFile(UUID cfId)
+ private File tempCacheFile(UUID cfId)
{
File path = getCachePath(cfId, CURRENT_VERSION);
- File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
- return SequentialWriter.open(tmpFile);
+ return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
}
private void deleteOldCacheFiles()