You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/21 11:18:21 UTC

[1/7] cassandra git commit: Re-introduce the use of Verb.RANGE_SLICE for range queries explanation

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 7a85c8b8f -> a3fc425df
  refs/heads/cassandra-3.0 be45eb6be -> 1c27abf6c
  refs/heads/trunk 0fd857baf -> 1e951c983


Re-introduce the use of Verb.RANGE_SLICE for range queries
explanation

patch by slebresne; reviewed by aweisberg for CASSANDRA-10125


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be45eb6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be45eb6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be45eb6b

Branch: refs/heads/trunk
Commit: be45eb6bea4a0419dd058e1787a482263b2ff198
Parents: 9ed2727
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 19 11:52:18 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 21 09:58:32 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/PartitionRangeReadCommand.java | 13 ++++---
 .../cassandra/db/RangeSliceVerbHandler.java     |  2 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 40 +++++++++++++++-----
 .../org/apache/cassandra/db/ReadResponse.java   | 30 +++++++++++++++
 .../db/SinglePartitionReadCommand.java          |  4 +-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 7 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cea8c73..47fea8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)
  * Rewrite hinted handoff (CASSANDRA-6230)
  * Fix query on static compact tables (CASSANDRA-10093)
  * Fix race during construction of commit log (CASSANDRA-10049)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index e7288cc..42d5425 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -237,13 +237,14 @@ public class PartitionRangeReadCommand extends ReadCommand
         };
     }
 
-    @SuppressWarnings("deprecation")
-    protected MessageOut<ReadCommand> createLegacyMessage()
+    public MessageOut<ReadCommand> createMessage(int version)
     {
-        if (this.dataRange.isPaging())
-            return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer);
-        else
-            return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+        if (version >= MessagingService.VERSION_30)
+            return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
+
+        return dataRange().isPaging()
+             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer)
+             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
index 3f1d660..55826f5 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
@@ -24,6 +24,6 @@ public class RangeSliceVerbHandler extends ReadCommandVerbHandler
     @Override
     protected IVersionedSerializer<ReadResponse> serializer()
     {
-        return ReadResponse.legacyRangeSliceReplySerializer;
+        return ReadResponse.rangeSliceSerializer;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 4830124..d63a832 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -59,6 +59,9 @@ public abstract class ReadCommand implements ReadQuery
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
+    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer();
 
     public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
     public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
@@ -411,15 +414,7 @@ public abstract class ReadCommand implements ReadQuery
     /**
      * Creates a message for this command.
      */
-    public MessageOut<ReadCommand> createMessage(int version)
-    {
-        if (version >= MessagingService.VERSION_30)
-            return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
-
-        return createLegacyMessage();
-    }
-
-    protected abstract MessageOut<ReadCommand> createLegacyMessage();
+    public abstract MessageOut<ReadCommand> createMessage(int version);
 
     protected abstract void appendCQLWhereClause(StringBuilder sb);
 
@@ -529,6 +524,33 @@ public abstract class ReadCommand implements ReadQuery
         }
     }
 
+    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
+    // compatibility
+    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
+    {
+        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                legacyRangeSliceCommandSerializer.serialize(command, out, version);
+            else
+                serializer.serialize(command, out, version);
+        }
+
+        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+        {
+            return version < MessagingService.VERSION_30
+                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
+                 : serializer.deserialize(in, version);
+        }
+
+        public long serializedSize(ReadCommand command, int version)
+        {
+            return version < MessagingService.VERSION_30
+                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
+                 : serializer.serializedSize(command, version);
+        }
+    }
+
     private enum LegacyType
     {
         GET_BY_NAMES((byte)1),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 5f40210..21f6106 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -42,7 +42,12 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public abstract class ReadResponse
 {
+    // Serializer for single partition read response
     public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
+    // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to
+    // 'legacyRangeSliceReplySerializer' in older version.
+    public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new RangeSliceSerializer();
+    // Serializer for the pre-3.0 rang slice responses.
     public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
 
     // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
@@ -397,6 +402,31 @@ public abstract class ReadResponse
         }
     }
 
+    private static class RangeSliceSerializer implements IVersionedSerializer<ReadResponse>
+    {
+        public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                legacyRangeSliceReplySerializer.serialize(response, out, version);
+            else
+                serializer.serialize(response, out, version);
+        }
+
+        public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
+        {
+            return version < MessagingService.VERSION_30
+                 ? legacyRangeSliceReplySerializer.deserialize(in, version)
+                 : serializer.deserialize(in, version);
+        }
+
+        public long serializedSize(ReadResponse response, int version)
+        {
+            return version < MessagingService.VERSION_30
+                 ? legacyRangeSliceReplySerializer.serializedSize(response, version)
+                 : serializer.serializedSize(response, version);
+        }
+    }
+
     private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d9b0e2b..ca135f8 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -395,9 +395,9 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
                              nowInSec());
     }
 
-    protected MessageOut<ReadCommand> createLegacyMessage()
+    public MessageOut<ReadCommand> createMessage(int version)
     {
-        return new MessageOut<>(MessagingService.Verb.READ, this, legacyReadCommandSerializer);
+        return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 13632ac..e59cd58 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -107,7 +107,7 @@ public final class MessagingService implements MessagingServiceMBean
         @Deprecated STREAM_INITIATE_DONE,
         @Deprecated STREAM_REPLY,
         @Deprecated STREAM_REQUEST,
-        @Deprecated RANGE_SLICE,
+        RANGE_SLICE,
         @Deprecated BOOTSTRAP_TOKEN,
         @Deprecated TREE_REQUEST,
         @Deprecated TREE_RESPONSE,
@@ -212,7 +212,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
-        put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+        put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
         put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
@@ -241,7 +241,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
-        put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
+        put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
         put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);


[2/7] cassandra git commit: Revert AutoSavingCache stream factory to OutputStream

Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream

Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream

patch by bdeggleston; reviewed by benedict


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d

Branch: refs/heads/cassandra-2.2
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  8 +++---
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 {
     public interface IStreamFactory
     {
-        public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+        InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+        OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
     }
 
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+        public OutputStream getOutputStream(File dataPath, File crcPath)
         {
-            return SequentialWriter.open(dataPath, crcPath);
+            return SequentialWriter.open(dataPath, crcPath).finishOnClose();
         }
     };
 
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
-            HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
 
             try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    DataOutputPlus writer = dataOutputs.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                        SequentialWriter sequentialWriter;
+                        OutputStream stream;
                         try
                         {
-                            sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
-                            writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+                            stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
                             throw new RuntimeException(e);
                         }
                         paths.put(cfId, cacheFilePaths);
-                        sequentialWriters.put(cfId, sequentialWriter);
-                        dataOutputs.put(cfId, writer);
+                        streams.put(cfId, stream);
+                        writers.put(cfId, writer);
                     }
 
                     try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         // not thrown (by OHC)
                     }
 
-                for (SequentialWriter writer : sequentialWriters.values())
+                for (OutputStream writer : streams.values())
                 {
-                    writer.finish();
                     FileUtils.closeQuietly(writer);
                 }
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     protected Runnable runPostFlush;
 
     private final TransactionalProxy txnProxy = txnProxy();
+    private boolean finishOnClose;
     protected Descriptor descriptor;
 
     // due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
+    public SequentialWriter finishOnClose()
+    {
+        finishOnClose = true;
+        return this;
+    }
+
     public void write(int value) throws ClosedChannelException
     {
         if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     @Override
     public final void close()
     {
-        txnProxy.close();
+        if (finishOnClose)
+            txnProxy.finish();
+        else
+            txnProxy.close();
     }
 
     public final void finish()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
  * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
  * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
  */
 public interface Transactional extends AutoCloseable
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
-        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
         DataInput canon = testWrite(write);
         write.flush();
-        writer.finish();
         write.close();
         DataInputStream test = new DataInputStream(new FileInputStream(file));
         testRead(test, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.io.Files;
 import org.junit.After;
+import org.junit.Test;
 
 import junit.framework.Assert;
 
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
         }
     }
 
+    /**
+     * Tests that the output stream exposed by SequentialWriter behaves as expected
+     */
+    @Test
+    public void outputStream()
+    {
+        File tempFile = new File(Files.createTempDir(), "test.txt");
+        Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+        try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+        {
+            os.writeUTF("123");
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+
+        Assert.assertTrue("temp file should exist", tempFile.exists());
+    }
+
 }


[3/7] cassandra git commit: Revert AutoSavingCache stream factory to OutputStream

Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream

Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream

patch by bdeggleston; reviewed by benedict


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d

Branch: refs/heads/trunk
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  8 +++---
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 {
     public interface IStreamFactory
     {
-        public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+        InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+        OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
     }
 
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+        public OutputStream getOutputStream(File dataPath, File crcPath)
         {
-            return SequentialWriter.open(dataPath, crcPath);
+            return SequentialWriter.open(dataPath, crcPath).finishOnClose();
         }
     };
 
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
-            HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
 
             try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    DataOutputPlus writer = dataOutputs.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                        SequentialWriter sequentialWriter;
+                        OutputStream stream;
                         try
                         {
-                            sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
-                            writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+                            stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
                             throw new RuntimeException(e);
                         }
                         paths.put(cfId, cacheFilePaths);
-                        sequentialWriters.put(cfId, sequentialWriter);
-                        dataOutputs.put(cfId, writer);
+                        streams.put(cfId, stream);
+                        writers.put(cfId, writer);
                     }
 
                     try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         // not thrown (by OHC)
                     }
 
-                for (SequentialWriter writer : sequentialWriters.values())
+                for (OutputStream writer : streams.values())
                 {
-                    writer.finish();
                     FileUtils.closeQuietly(writer);
                 }
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     protected Runnable runPostFlush;
 
     private final TransactionalProxy txnProxy = txnProxy();
+    private boolean finishOnClose;
     protected Descriptor descriptor;
 
     // due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
+    public SequentialWriter finishOnClose()
+    {
+        finishOnClose = true;
+        return this;
+    }
+
     public void write(int value) throws ClosedChannelException
     {
         if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     @Override
     public final void close()
     {
-        txnProxy.close();
+        if (finishOnClose)
+            txnProxy.finish();
+        else
+            txnProxy.close();
     }
 
     public final void finish()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
  * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
  * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
  */
 public interface Transactional extends AutoCloseable
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
-        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
         DataInput canon = testWrite(write);
         write.flush();
-        writer.finish();
         write.close();
         DataInputStream test = new DataInputStream(new FileInputStream(file));
         testRead(test, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.io.Files;
 import org.junit.After;
+import org.junit.Test;
 
 import junit.framework.Assert;
 
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
         }
     }
 
+    /**
+     * Tests that the output stream exposed by SequentialWriter behaves as expected
+     */
+    @Test
+    public void outputStream()
+    {
+        File tempFile = new File(Files.createTempDir(), "test.txt");
+        Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+        try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+        {
+            os.writeUTF("123");
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+
+        Assert.assertTrue("temp file should exist", tempFile.exists());
+    }
+
 }


[6/7] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	src/java/org/apache/cassandra/io/util/SequentialWriter.java
	src/java/org/apache/cassandra/utils/concurrent/Transactional.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c27abf6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c27abf6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c27abf6

Branch: refs/heads/cassandra-3.0
Commit: 1c27abf6c3f8199cd75b8b1930ce3a8285e627bb
Parents: be45eb6 a3fc425
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:27 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:27 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  7 ++++-
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 905a5c6,0c39469..0375e23
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -170,6 -168,79 +171,12 @@@ public class SequentialWriter extends B
          return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
      }
  
+     public SequentialWriter finishOnClose()
+     {
+         finishOnClose = true;
+         return this;
+     }
+ 
 -    public void write(int value) throws ClosedChannelException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        if (!buffer.hasRemaining())
 -        {
 -            reBuffer();
 -        }
 -
 -        buffer.put((byte) value);
 -
 -        isDirty = true;
 -        syncNeeded = true;
 -    }
 -
 -    public void write(byte[] buffer) throws IOException
 -    {
 -        write(buffer, 0, buffer.length);
 -    }
 -
 -    public void write(byte[] data, int offset, int length) throws IOException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        int position = offset;
 -        int remaining = length;
 -        while (remaining > 0)
 -        {
 -            if (!buffer.hasRemaining())
 -                reBuffer();
 -
 -            int toCopy = Math.min(remaining, buffer.remaining());
 -            buffer.put(data, position, toCopy);
 -
 -            remaining -= toCopy;
 -            position += toCopy;
 -
 -            isDirty = true;
 -            syncNeeded = true;
 -        }
 -    }
 -
 -    public int write(ByteBuffer src) throws IOException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        int length = src.remaining();
 -        int finalLimit = src.limit();
 -        while (src.hasRemaining())
 -        {
 -            if (!buffer.hasRemaining())
 -                reBuffer();
 -
 -            if (buffer.remaining() < src.remaining())
 -                src.limit(src.position() + buffer.remaining());
 -            buffer.put(src);
 -            src.limit(finalLimit);
 -
 -            isDirty = true;
 -            syncNeeded = true;
 -        }
 -        return length;
 -    }
 -
      /**
       * Synchronize file contents with disk.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index f79a795,85c3de5..02562ce
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@@ -41,14 -41,14 +41,19 @@@ import static org.apache.cassandra.util
   * If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
   * to release any temporary state/resources
   *
 - * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
 + * All exceptions and assertions that may be thrown should be checked and ruled out during commit preparation.
 + * Commit should generally never throw an exception unless there is a real correctness-affecting exception that
 + * cannot be moved to prepareToCommit, in which case this operation MUST be executed before any other commit
 + * methods in the object graph.
 + *
 + * If exceptions are generated by commit after this initial moment, it is not at all clear what the correct behaviour
   * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
-  * during cleanup, say), and rollback cannot now occur.
+  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+  * should be checked and ruled out during commit preparation.
+  *
+  * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+  * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+  * most AutoCloseable implementations.
   */
  public interface Transactional extends AutoCloseable
  {


[4/7] cassandra git commit: Revert AutoSavingCache stream factory to OutputStream

Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream

Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream

patch by bdeggleston; reviewed by benedict


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d

Branch: refs/heads/cassandra-3.0
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  8 +++---
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 {
     public interface IStreamFactory
     {
-        public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+        InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+        OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
     }
 
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
-        public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+        public OutputStream getOutputStream(File dataPath, File crcPath)
         {
-            return SequentialWriter.open(dataPath, crcPath);
+            return SequentialWriter.open(dataPath, crcPath).finishOnClose();
         }
     };
 
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
-            HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+            HashMap<UUID, OutputStream> streams = new HashMap<>();
             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
 
             try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!Schema.instance.hasCF(key.getCFId()))
                         continue; // the table has been dropped.
 
-                    DataOutputPlus writer = dataOutputs.get(cfId);
+                    DataOutputPlus writer = writers.get(cfId);
                     if (writer == null)
                     {
                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                        SequentialWriter sequentialWriter;
+                        OutputStream stream;
                         try
                         {
-                            sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
-                            writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+                            stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
                             throw new RuntimeException(e);
                         }
                         paths.put(cfId, cacheFilePaths);
-                        sequentialWriters.put(cfId, sequentialWriter);
-                        dataOutputs.put(cfId, writer);
+                        streams.put(cfId, stream);
+                        writers.put(cfId, writer);
                     }
 
                     try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         // not thrown (by OHC)
                     }
 
-                for (SequentialWriter writer : sequentialWriters.values())
+                for (OutputStream writer : streams.values())
                 {
-                    writer.finish();
                     FileUtils.closeQuietly(writer);
                 }
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
             {
                 UUID cfId = entry.getKey();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     protected Runnable runPostFlush;
 
     private final TransactionalProxy txnProxy = txnProxy();
+    private boolean finishOnClose;
     protected Descriptor descriptor;
 
     // due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
+    public SequentialWriter finishOnClose()
+    {
+        finishOnClose = true;
+        return this;
+    }
+
     public void write(int value) throws ClosedChannelException
     {
         if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     @Override
     public final void close()
     {
-        txnProxy.close();
+        if (finishOnClose)
+            txnProxy.finish();
+        else
+            txnProxy.close();
     }
 
     public final void finish()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
  * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
  * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
  */
 public interface Transactional extends AutoCloseable
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
-        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
         DataInput canon = testWrite(write);
         write.flush();
-        writer.finish();
         write.close();
         DataInputStream test = new DataInputStream(new FileInputStream(file));
         testRead(test, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.io.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.io.Files;
 import org.junit.After;
+import org.junit.Test;
 
 import junit.framework.Assert;
 
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
         }
     }
 
+    /**
+     * Tests that the output stream exposed by SequentialWriter behaves as expected
+     */
+    @Test
+    public void outputStream()
+    {
+        File tempFile = new File(Files.createTempDir(), "test.txt");
+        Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+        try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+        {
+            os.writeUTF("123");
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+
+        Assert.assertTrue("temp file should exist", tempFile.exists());
+    }
+
 }


[5/7] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	src/java/org/apache/cassandra/io/util/SequentialWriter.java
	src/java/org/apache/cassandra/utils/concurrent/Transactional.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c27abf6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c27abf6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c27abf6

Branch: refs/heads/trunk
Commit: 1c27abf6c3f8199cd75b8b1930ce3a8285e627bb
Parents: be45eb6 a3fc425
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:27 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:27 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++++-
 .../utils/concurrent/Transactional.java         |  7 ++++-
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
 5 files changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 905a5c6,0c39469..0375e23
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -170,6 -168,79 +171,12 @@@ public class SequentialWriter extends B
          return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
      }
  
+     public SequentialWriter finishOnClose()
+     {
+         finishOnClose = true;
+         return this;
+     }
+ 
 -    public void write(int value) throws ClosedChannelException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        if (!buffer.hasRemaining())
 -        {
 -            reBuffer();
 -        }
 -
 -        buffer.put((byte) value);
 -
 -        isDirty = true;
 -        syncNeeded = true;
 -    }
 -
 -    public void write(byte[] buffer) throws IOException
 -    {
 -        write(buffer, 0, buffer.length);
 -    }
 -
 -    public void write(byte[] data, int offset, int length) throws IOException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        int position = offset;
 -        int remaining = length;
 -        while (remaining > 0)
 -        {
 -            if (!buffer.hasRemaining())
 -                reBuffer();
 -
 -            int toCopy = Math.min(remaining, buffer.remaining());
 -            buffer.put(data, position, toCopy);
 -
 -            remaining -= toCopy;
 -            position += toCopy;
 -
 -            isDirty = true;
 -            syncNeeded = true;
 -        }
 -    }
 -
 -    public int write(ByteBuffer src) throws IOException
 -    {
 -        if (buffer == null)
 -            throw new ClosedChannelException();
 -
 -        int length = src.remaining();
 -        int finalLimit = src.limit();
 -        while (src.hasRemaining())
 -        {
 -            if (!buffer.hasRemaining())
 -                reBuffer();
 -
 -            if (buffer.remaining() < src.remaining())
 -                src.limit(src.position() + buffer.remaining());
 -            buffer.put(src);
 -            src.limit(finalLimit);
 -
 -            isDirty = true;
 -            syncNeeded = true;
 -        }
 -        return length;
 -    }
 -
      /**
       * Synchronize file contents with disk.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index f79a795,85c3de5..02562ce
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@@ -41,14 -41,14 +41,19 @@@ import static org.apache.cassandra.util
   * If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
   * to release any temporary state/resources
   *
 - * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
 + * All exceptions and assertions that may be thrown should be checked and ruled out during commit preparation.
 + * Commit should generally never throw an exception unless there is a real correctness-affecting exception that
 + * cannot be moved to prepareToCommit, in which case this operation MUST be executed before any other commit
 + * methods in the object graph.
 + *
 + * If exceptions are generated by commit after this initial moment, it is not at all clear what the correct behaviour
   * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
-  * during cleanup, say), and rollback cannot now occur.
+  * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+  * should be checked and ruled out during commit preparation.
+  *
+  * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+  * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+  * most AutoCloseable implementations.
   */
  public interface Transactional extends AutoCloseable
  {


[7/7] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e951c98
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e951c98
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e951c98

Branch: refs/heads/trunk
Commit: 1e951c983197c696b9c456406125e8f7b5843a6b
Parents: 0fd857b 1c27abf
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:35 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:35 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 29 +++++++-------
 .../cassandra/db/PartitionRangeReadCommand.java | 13 ++++---
 .../cassandra/db/RangeSliceVerbHandler.java     |  2 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 40 +++++++++++++++-----
 .../org/apache/cassandra/db/ReadResponse.java   | 30 +++++++++++++++
 .../db/SinglePartitionReadCommand.java          |  4 +-
 .../cassandra/io/util/SequentialWriter.java     | 12 +++++-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../utils/concurrent/Transactional.java         |  7 +++-
 .../cassandra/io/util/DataOutputTest.java       |  3 +-
 .../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++
 12 files changed, 131 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e951c98/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e6093ad,47fea8b..d03b430
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 +3.2
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0.0-beta1
+  * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)
   * Rewrite hinted handoff (CASSANDRA-6230)
   * Fix query on static compact tables (CASSANDRA-10093)
   * Fix race during construction of commit log (CASSANDRA-10049)