You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/21 11:18:24 UTC

[4/7] cassandra git commit: Revert AutoSavingCache stream factory to OutputStream

Revert AutoSavingCache stream factory to OutputStream

Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream

patch by bdeggleston; reviewed by benedict


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d

Branch: refs/heads/cassandra-3.0
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  8 +++---
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 {
     public interface IStreamFactory
     {
-        public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+        InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+        OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
     }
 
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+        public OutputStream getOutputStream(File dataPath, File crcPath)
         {
-            return SequentialWriter.open(dataPath, crcPath);
+            return SequentialWriter.open(dataPath, crcPath).finishOnClose();
         }
     };
 
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
-            HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
 
             try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    DataOutputPlus writer = dataOutputs.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                        SequentialWriter sequentialWriter;
+                        OutputStream stream;
                         try
                         {
-                            sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
-                            writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+                            stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
                             throw new RuntimeException(e);
                         }
                         paths.put(cfId, cacheFilePaths);
-                        sequentialWriters.put(cfId, sequentialWriter);
-                        dataOutputs.put(cfId, writer);
+                        streams.put(cfId, stream);
+                        writers.put(cfId, writer);
                     }
 
                     try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         // not thrown (by OHC)
                     }
 
-                for (SequentialWriter writer : sequentialWriters.values())
+                for (OutputStream writer : streams.values())
                 {
-                    writer.finish();
                     FileUtils.closeQuietly(writer);
                 }
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     protected Runnable runPostFlush;
 
     private final TransactionalProxy txnProxy = txnProxy();
+    private boolean finishOnClose;
     protected Descriptor descriptor;
 
     // due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
+    public SequentialWriter finishOnClose()
+    {
+        finishOnClose = true;
+        return this;
+    }
+
     public void write(int value) throws ClosedChannelException
     {
         if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     @Override
     public final void close()
     {
-        txnProxy.close();
+        if (finishOnClose)
+            txnProxy.finish();
+        else
+            txnProxy.close();
     }
 
     public final void finish()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
  * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
  * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
  */
 public interface Transactional extends AutoCloseable
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
-        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
         DataInput canon = testWrite(write);
         write.flush();
-        writer.finish();
         write.close();
         DataInputStream test = new DataInputStream(new FileInputStream(file));
         testRead(test, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.io.Files;
 import org.junit.After;
+import org.junit.Test;
 
 import junit.framework.Assert;
 
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
         }
     }
 
+    /**
+     * Tests that the output stream exposed by SequentialWriter behaves as expected
+     */
+    @Test
+    public void outputStream()
+    {
+        File tempFile = new File(Files.createTempDir(), "test.txt");
+        Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+        try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+        {
+            os.writeUTF("123");
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+
+        Assert.assertTrue("temp file should exist", tempFile.exists());
+    }
+
 }