You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/05 01:22:04 UTC

[1/2] git commit: Make cache serializers pluggable

Repository: cassandra
Updated Branches:
  refs/heads/trunk 9bf17e15a -> 87def12cb


Make cache serializers pluggable

patch by Blake Eggleston; reviewed by Aleksey Yeschenko for
CASSANDRA-8096


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a0ba84b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a0ba84b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a0ba84b

Branch: refs/heads/trunk
Commit: 0a0ba84b87046a51338021ca56acdba4b40bd69a
Parents: a446e80
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Oct 24 12:16:00 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:16:00 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
 2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddbc810..42cef8c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Make cache serializers pluggable (CASSANDRA-8096)
  * Fix issues with CONTAINS (KEY) queries on secondary indexes
    (CASSANDRA-8147)
  * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0ba84b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 1a861f8..fca939a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -37,10 +37,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.LengthAvailableInputStream;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -48,6 +45,12 @@ import org.apache.cassandra.utils.Pair;
 
 public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
 {
+    public interface IStreamFactory
+    {
+        public InputStream getInputStream(File path) throws FileNotFoundException;
+        public OutputStream getOutputStream(File path) throws FileNotFoundException;
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
 
     /** True if a cache flush is currently executing: only one may execute at a time. */
@@ -59,6 +62,25 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     private CacheSerializer<K, V> cacheLoader;
     private static final String CURRENT_VERSION = "b";
 
+    private static volatile IStreamFactory streamFactory = new IStreamFactory()
+    {
+        public InputStream getInputStream(File path) throws FileNotFoundException
+        {
+            return new FileInputStream(path);
+        }
+
+        public OutputStream getOutputStream(File path) throws FileNotFoundException
+        {
+            return new FileOutputStream(path);
+        }
+    };
+
+    // Unused, but exposed for a reason. See CASSANDRA-8096.
+    public static void setStreamFactory(IStreamFactory streamFactory)
+    {
+        AutoSavingCache.streamFactory = streamFactory;
+    }
+
     public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
     {
         super(cacheType.toString(), cache);
@@ -122,7 +144,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             try
             {
                 logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
+                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
                 while (in.available() > 0)
                 {
@@ -216,7 +238,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, SequentialWriter> writers = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
+            HashMap<UUID, File> paths = new HashMap<>();
 
             try
             {
@@ -226,20 +250,32 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    SequentialWriter writer = writers.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
-                        writer = tempCacheFile(cfId);
+                        File writerPath = tempCacheFile(cfId);
+                        OutputStream stream;
+                        try
+                        {
+                            stream = streamFactory.getOutputStream(writerPath);
+                            writer = new DataOutputStreamPlus(stream);
+                        }
+                        catch (FileNotFoundException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                        paths.put(cfId, writerPath);
+                        streams.put(cfId, stream);
                         writers.put(cfId, writer);
                     }
 
                     try
                     {
-                        cacheLoader.serialize(key, writer.stream);
+                        cacheLoader.serialize(key, writer);
                     }
                     catch (IOException e)
                     {
-                        throw new FSWriteError(e, writer.getPath());
+                        throw new FSWriteError(e, paths.get(cfId));
                     }
 
                     keysWritten++;
@@ -247,16 +283,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             finally
             {
-                for (SequentialWriter writer : writers.values())
+                for (OutputStream writer : streams.values())
                     FileUtils.closeQuietly(writer);
             }
 
-            for (Map.Entry<UUID, SequentialWriter> entry : writers.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
-                SequentialWriter writer = entry.getValue();
 
-                File tmpFile = new File(writer.getPath());
+                File tmpFile = paths.get(cfId);
                 File cacheFile = getCachePath(cfId, CURRENT_VERSION);
 
                 cacheFile.delete(); // ignore error if it didn't exist
@@ -267,11 +302,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
         }
 
-        private SequentialWriter tempCacheFile(UUID cfId)
+        private File tempCacheFile(UUID cfId)
         {
             File path = getCachePath(cfId, CURRENT_VERSION);
-            File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
-            return SequentialWriter.open(tmpFile);
+            return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
         }
 
         private void deleteOldCacheFiles()


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87def12c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87def12c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87def12c

Branch: refs/heads/trunk
Commit: 87def12cb1802d6bdb562019249065d3fa93a913
Parents: 9bf17e1 0a0ba84
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Nov 5 03:19:27 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 5 03:19:27 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 68 +++++++++++++++-----
 2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87def12c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7901aff,42cef8c..817cbcf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,5 +1,38 @@@
 +3.0
 + * Mark sstables as repaired after full repair (CASSANDRA-7586) 
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * improve concurrency of repair (CASSANDRA-6455)
 +
 +
  2.1.2
+  * Make cache serializers pluggable (CASSANDRA-8096)
   * Fix issues with CONTAINS (KEY) queries on secondary indexes
     (CASSANDRA-8147)
   * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87def12c/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------