You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/05 01:16:56 UTC

git commit: Make cache serializers pluggable

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 a446e80e0 -> 0a0ba84b8


Make cache serializers pluggable

patch by Blake Eggleston; reviewed by Aleksey Yeschenko for
CASSANDRA-8096


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a0ba84b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a0ba84b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a0ba84b

Branch: refs/heads/cassandra-2.1
Commit: 0a0ba84b87046a51338021ca56acdba4b40bd69a
Parents: a446e80
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Oct 24 12:16:00 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:16:00 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
 2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddbc810..42cef8c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Make cache serializers pluggable (CASSANDRA-8096)
  * Fix issues with CONTAINS (KEY) queries on secondary indexes
    (CASSANDRA-8147)
  * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 1a861f8..fca939a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -37,10 +37,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.LengthAvailableInputStream;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -48,6 +45,12 @@ import org.apache.cassandra.utils.Pair;
 
 public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
 {
+    public interface IStreamFactory
+    {
+        public InputStream getInputStream(File path) throws FileNotFoundException;
+        public OutputStream getOutputStream(File path) throws FileNotFoundException;
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
 
     /** True if a cache flush is currently executing: only one may execute at a time. */
@@ -59,6 +62,25 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     private CacheSerializer<K, V> cacheLoader;
     private static final String CURRENT_VERSION = "b";
 
+    private static volatile IStreamFactory streamFactory = new IStreamFactory()
+    {
+        public InputStream getInputStream(File path) throws FileNotFoundException
+        {
+            return new FileInputStream(path);
+        }
+
+        public OutputStream getOutputStream(File path) throws FileNotFoundException
+        {
+            return new FileOutputStream(path);
+        }
+    };
+
+    // Unused, but exposed for a reason. See CASSANDRA-8096.
+    public static void setStreamFactory(IStreamFactory streamFactory)
+    {
+        AutoSavingCache.streamFactory = streamFactory;
+    }
+
     public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
     {
         super(cacheType.toString(), cache);
@@ -122,7 +144,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             try
             {
                 logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
+                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
                 while (in.available() > 0)
                 {
@@ -216,7 +238,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, SequentialWriter> writers = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
+            HashMap<UUID, File> paths = new HashMap<>();
 
             try
             {
@@ -226,20 +250,32 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    SequentialWriter writer = writers.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
-                        writer = tempCacheFile(cfId);
+                        File writerPath = tempCacheFile(cfId);
+                        OutputStream stream;
+                        try
+                        {
+                            stream = streamFactory.getOutputStream(writerPath);
+                            writer = new DataOutputStreamPlus(stream);
+                        }
+                        catch (FileNotFoundException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                        paths.put(cfId, writerPath);
+                        streams.put(cfId, stream);
                         writers.put(cfId, writer);
                     }
 
                     try
                     {
-                        cacheLoader.serialize(key, writer.stream);
+                        cacheLoader.serialize(key, writer);
                     }
                     catch (IOException e)
                     {
-                        throw new FSWriteError(e, writer.getPath());
+                        throw new FSWriteError(e, paths.get(cfId));
                     }
 
                     keysWritten++;
@@ -247,16 +283,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             finally
             {
-                for (SequentialWriter writer : writers.values())
+                for (OutputStream writer : streams.values())
                     FileUtils.closeQuietly(writer);
             }
 
-            for (Map.Entry<UUID, SequentialWriter> entry : writers.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
-                SequentialWriter writer = entry.getValue();
 
-                File tmpFile = new File(writer.getPath());
+                File tmpFile = paths.get(cfId);
                 File cacheFile = getCachePath(cfId, CURRENT_VERSION);
 
                 cacheFile.delete(); // ignore error if it didn't exist
@@ -267,11 +302,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
         }
 
-        private SequentialWriter tempCacheFile(UUID cfId)
+        private File tempCacheFile(UUID cfId)
         {
             File path = getCachePath(cfId, CURRENT_VERSION);
-            File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
-            return SequentialWriter.open(tmpFile);
+            return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
         }
 
         private void deleteOldCacheFiles()