You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/21 11:18:21 UTC
[1/7] cassandra git commit: Re-introduce the use of Verb.RANGE_SLICE
for range queries explanation
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 7a85c8b8f -> a3fc425df
refs/heads/cassandra-3.0 be45eb6be -> 1c27abf6c
refs/heads/trunk 0fd857baf -> 1e951c983
Re-introduce the use of Verb.RANGE_SLICE for range queries
explanation
patch by slebresne; reviewed by aweisberg for CASSANDRA-10125
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be45eb6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be45eb6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be45eb6b
Branch: refs/heads/trunk
Commit: be45eb6bea4a0419dd058e1787a482263b2ff198
Parents: 9ed2727
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 19 11:52:18 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 21 09:58:32 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/PartitionRangeReadCommand.java | 13 ++++---
.../cassandra/db/RangeSliceVerbHandler.java | 2 +-
.../org/apache/cassandra/db/ReadCommand.java | 40 +++++++++++++++-----
.../org/apache/cassandra/db/ReadResponse.java | 30 +++++++++++++++
.../db/SinglePartitionReadCommand.java | 4 +-
.../apache/cassandra/net/MessagingService.java | 6 +--
7 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cea8c73..47fea8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)
* Rewrite hinted handoff (CASSANDRA-6230)
* Fix query on static compact tables (CASSANDRA-10093)
* Fix race during construction of commit log (CASSANDRA-10049)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index e7288cc..42d5425 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -237,13 +237,14 @@ public class PartitionRangeReadCommand extends ReadCommand
};
}
- @SuppressWarnings("deprecation")
- protected MessageOut<ReadCommand> createLegacyMessage()
+ public MessageOut<ReadCommand> createMessage(int version)
{
- if (this.dataRange.isPaging())
- return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer);
- else
- return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+ if (version >= MessagingService.VERSION_30)
+ return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
+
+ return dataRange().isPaging()
+ ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer)
+ : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
}
protected void appendCQLWhereClause(StringBuilder sb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
index 3f1d660..55826f5 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
@@ -24,6 +24,6 @@ public class RangeSliceVerbHandler extends ReadCommandVerbHandler
@Override
protected IVersionedSerializer<ReadResponse> serializer()
{
- return ReadResponse.legacyRangeSliceReplySerializer;
+ return ReadResponse.rangeSliceSerializer;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 4830124..d63a832 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -59,6 +59,9 @@ public abstract class ReadCommand implements ReadQuery
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+ // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
+ // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+ public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer();
public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
@@ -411,15 +414,7 @@ public abstract class ReadCommand implements ReadQuery
/**
* Creates a message for this command.
*/
- public MessageOut<ReadCommand> createMessage(int version)
- {
- if (version >= MessagingService.VERSION_30)
- return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
-
- return createLegacyMessage();
- }
-
- protected abstract MessageOut<ReadCommand> createLegacyMessage();
+ public abstract MessageOut<ReadCommand> createMessage(int version);
protected abstract void appendCQLWhereClause(StringBuilder sb);
@@ -529,6 +524,33 @@ public abstract class ReadCommand implements ReadQuery
}
}
+ // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
+ // compatibility
+ private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
+ {
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ legacyRangeSliceCommandSerializer.serialize(command, out, version);
+ else
+ serializer.serialize(command, out, version);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceCommandSerializer.deserialize(in, version)
+ : serializer.deserialize(in, version);
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
+ : serializer.serializedSize(command, version);
+ }
+ }
+
private enum LegacyType
{
GET_BY_NAMES((byte)1),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 5f40210..21f6106 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -42,7 +42,12 @@ import org.apache.cassandra.utils.FBUtilities;
public abstract class ReadResponse
{
+ // Serializer for single partition read response
public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
+ // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to
+ // 'legacyRangeSliceReplySerializer' in older version.
+ public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new RangeSliceSerializer();
+ // Serializer for the pre-3.0 rang slice responses.
public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
// This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
@@ -397,6 +402,31 @@ public abstract class ReadResponse
}
}
+ private static class RangeSliceSerializer implements IVersionedSerializer<ReadResponse>
+ {
+ public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ legacyRangeSliceReplySerializer.serialize(response, out, version);
+ else
+ serializer.serialize(response, out, version);
+ }
+
+ public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceReplySerializer.deserialize(in, version)
+ : serializer.deserialize(in, version);
+ }
+
+ public long serializedSize(ReadResponse response, int version)
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceReplySerializer.serializedSize(response, version)
+ : serializer.serializedSize(response, version);
+ }
+ }
+
private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d9b0e2b..ca135f8 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -395,9 +395,9 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
nowInSec());
}
- protected MessageOut<ReadCommand> createLegacyMessage()
+ public MessageOut<ReadCommand> createMessage(int version)
{
- return new MessageOut<>(MessagingService.Verb.READ, this, legacyReadCommandSerializer);
+ return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer);
}
protected void appendCQLWhereClause(StringBuilder sb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be45eb6b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 13632ac..e59cd58 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -107,7 +107,7 @@ public final class MessagingService implements MessagingServiceMBean
@Deprecated STREAM_INITIATE_DONE,
@Deprecated STREAM_REPLY,
@Deprecated STREAM_REQUEST,
- @Deprecated RANGE_SLICE,
+ RANGE_SLICE,
@Deprecated BOOTSTRAP_TOKEN,
@Deprecated TREE_REQUEST,
@Deprecated TREE_RESPONSE,
@@ -212,7 +212,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
- put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+ put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
@@ -241,7 +241,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
- put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
+ put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
put(Verb.READ, ReadResponse.serializer);
put(Verb.TRUNCATE, TruncateResponse.serializer);
[2/7] cassandra git commit: Revert AutoSavingCache stream factory to
OutputStream
Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream
Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream
patch by bdeggleston; reviewed by benedict
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d
Branch: refs/heads/cassandra-2.2
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 8 +++---
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
- public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
- public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+ InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+ OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
}
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
- public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+ public OutputStream getOutputStream(File dataPath, File crcPath)
{
- return SequentialWriter.open(dataPath, crcPath);
+ return SequentialWriter.open(dataPath, crcPath).finishOnClose();
}
};
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
- HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- DataOutputPlus writer = dataOutputs.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
- SequentialWriter sequentialWriter;
+ OutputStream stream;
try
{
- sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
- writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+ stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+ writer = new WrappedDataOutputStreamPlus(stream);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
paths.put(cfId, cacheFilePaths);
- sequentialWriters.put(cfId, sequentialWriter);
- dataOutputs.put(cfId, writer);
+ streams.put(cfId, stream);
+ writers.put(cfId, writer);
}
try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
// not thrown (by OHC)
}
- for (SequentialWriter writer : sequentialWriters.values())
+ for (OutputStream writer : streams.values())
{
- writer.finish();
FileUtils.closeQuietly(writer);
}
}
- for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
private final TransactionalProxy txnProxy = txnProxy();
+ private boolean finishOnClose;
protected Descriptor descriptor;
// due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
public void write(int value) throws ClosedChannelException
{
if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
public final void close()
{
- txnProxy.close();
+ if (finishOnClose)
+ txnProxy.finish();
+ else
+ txnProxy.close();
}
public final void finish()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
*/
package org.apache.cassandra.utils.concurrent;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
* during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
* should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
{
File file = FileUtils.createTempFile("dataoutput", "test");
final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
- DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
DataInput canon = testWrite(write);
write.flush();
- writer.finish();
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
testRead(test, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.io.Files;
import org.junit.After;
+import org.junit.Test;
import junit.framework.Assert;
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
}
}
+ /**
+ * Tests that the output stream exposed by SequentialWriter behaves as expected
+ */
+ @Test
+ public void outputStream()
+ {
+ File tempFile = new File(Files.createTempDir(), "test.txt");
+ Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+ try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+ {
+ os.writeUTF("123");
+ }
+ catch (IOException e)
+ {
+ Assert.fail();
+ }
+
+ Assert.assertTrue("temp file should exist", tempFile.exists());
+ }
+
}
[3/7] cassandra git commit: Revert AutoSavingCache stream factory to
OutputStream
Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream
Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream
patch by bdeggleston; reviewed by benedict
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d
Branch: refs/heads/trunk
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 8 +++---
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
- public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
- public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+ InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+ OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
}
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
- public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+ public OutputStream getOutputStream(File dataPath, File crcPath)
{
- return SequentialWriter.open(dataPath, crcPath);
+ return SequentialWriter.open(dataPath, crcPath).finishOnClose();
}
};
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
- HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- DataOutputPlus writer = dataOutputs.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
- SequentialWriter sequentialWriter;
+ OutputStream stream;
try
{
- sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
- writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+ stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+ writer = new WrappedDataOutputStreamPlus(stream);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
paths.put(cfId, cacheFilePaths);
- sequentialWriters.put(cfId, sequentialWriter);
- dataOutputs.put(cfId, writer);
+ streams.put(cfId, stream);
+ writers.put(cfId, writer);
}
try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
// not thrown (by OHC)
}
- for (SequentialWriter writer : sequentialWriters.values())
+ for (OutputStream writer : streams.values())
{
- writer.finish();
FileUtils.closeQuietly(writer);
}
}
- for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
private final TransactionalProxy txnProxy = txnProxy();
+ private boolean finishOnClose;
protected Descriptor descriptor;
// due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
public void write(int value) throws ClosedChannelException
{
if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
public final void close()
{
- txnProxy.close();
+ if (finishOnClose)
+ txnProxy.finish();
+ else
+ txnProxy.close();
}
public final void finish()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
*/
package org.apache.cassandra.utils.concurrent;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
* during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
* should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
{
File file = FileUtils.createTempFile("dataoutput", "test");
final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
- DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
DataInput canon = testWrite(write);
write.flush();
- writer.finish();
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
testRead(test, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.io.Files;
import org.junit.After;
+import org.junit.Test;
import junit.framework.Assert;
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
}
}
+ /**
+ * Tests that the output stream exposed by SequentialWriter behaves as expected
+ */
+ @Test
+ public void outputStream()
+ {
+ File tempFile = new File(Files.createTempDir(), "test.txt");
+ Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+ try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+ {
+ os.writeUTF("123");
+ }
+ catch (IOException e)
+ {
+ Assert.fail();
+ }
+
+ Assert.assertTrue("temp file should exist", tempFile.exists());
+ }
+
}
[6/7] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Conflicts:
src/java/org/apache/cassandra/io/util/SequentialWriter.java
src/java/org/apache/cassandra/utils/concurrent/Transactional.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c27abf6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c27abf6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c27abf6
Branch: refs/heads/cassandra-3.0
Commit: 1c27abf6c3f8199cd75b8b1930ce3a8285e627bb
Parents: be45eb6 a3fc425
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:27 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:27 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 7 ++++-
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 905a5c6,0c39469..0375e23
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -170,6 -168,79 +171,12 @@@ public class SequentialWriter extends B
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
- public void write(int value) throws ClosedChannelException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- if (!buffer.hasRemaining())
- {
- reBuffer();
- }
-
- buffer.put((byte) value);
-
- isDirty = true;
- syncNeeded = true;
- }
-
- public void write(byte[] buffer) throws IOException
- {
- write(buffer, 0, buffer.length);
- }
-
- public void write(byte[] data, int offset, int length) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int position = offset;
- int remaining = length;
- while (remaining > 0)
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- int toCopy = Math.min(remaining, buffer.remaining());
- buffer.put(data, position, toCopy);
-
- remaining -= toCopy;
- position += toCopy;
-
- isDirty = true;
- syncNeeded = true;
- }
- }
-
- public int write(ByteBuffer src) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int length = src.remaining();
- int finalLimit = src.limit();
- while (src.hasRemaining())
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- if (buffer.remaining() < src.remaining())
- src.limit(src.position() + buffer.remaining());
- buffer.put(src);
- src.limit(finalLimit);
-
- isDirty = true;
- syncNeeded = true;
- }
- return length;
- }
-
/**
* Synchronize file contents with disk.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index f79a795,85c3de5..02562ce
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@@ -41,14 -41,14 +41,19 @@@ import static org.apache.cassandra.util
* If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
* to release any temporary state/resources
*
- * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
+ * All exceptions and assertions that may be thrown should be checked and ruled out during commit preparation.
+ * Commit should generally never throw an exception unless there is a real correctness-affecting exception that
+ * cannot be moved to prepareToCommit, in which case this operation MUST be executed before any other commit
+ * methods in the object graph.
+ *
+ * If exceptions are generated by commit after this initial moment, it is not at all clear what the correct behaviour
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
- * during cleanup, say), and rollback cannot now occur.
+ * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+ * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
[4/7] cassandra git commit: Revert AutoSavingCache stream factory to
OutputStream
Posted by be...@apache.org.
Revert AutoSavingCache stream factory to OutputStream
Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream
patch by bdeggleston; reviewed by benedict
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d
Branch: refs/heads/cassandra-3.0
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 8 +++---
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
- public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
- public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+ InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+ OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
}
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
- public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+ public OutputStream getOutputStream(File dataPath, File crcPath)
{
- return SequentialWriter.open(dataPath, crcPath);
+ return SequentialWriter.open(dataPath, crcPath).finishOnClose();
}
};
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
- HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- DataOutputPlus writer = dataOutputs.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
- SequentialWriter sequentialWriter;
+ OutputStream stream;
try
{
- sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
- writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+ stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+ writer = new WrappedDataOutputStreamPlus(stream);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
paths.put(cfId, cacheFilePaths);
- sequentialWriters.put(cfId, sequentialWriter);
- dataOutputs.put(cfId, writer);
+ streams.put(cfId, stream);
+ writers.put(cfId, writer);
}
try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
// not thrown (by OHC)
}
- for (SequentialWriter writer : sequentialWriters.values())
+ for (OutputStream writer : streams.values())
{
- writer.finish();
FileUtils.closeQuietly(writer);
}
}
- for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
private final TransactionalProxy txnProxy = txnProxy();
+ private boolean finishOnClose;
protected Descriptor descriptor;
// due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
public void write(int value) throws ClosedChannelException
{
if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
public final void close()
{
- txnProxy.close();
+ if (finishOnClose)
+ txnProxy.finish();
+ else
+ txnProxy.close();
}
public final void finish()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
*/
package org.apache.cassandra.utils.concurrent;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
* during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
* should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
{
File file = FileUtils.createTempFile("dataoutput", "test");
final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
- DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
DataInput canon = testWrite(write);
write.flush();
- writer.finish();
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
testRead(test, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.io.Files;
import org.junit.After;
+import org.junit.Test;
import junit.framework.Assert;
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
}
}
+ /**
+ * Tests that the output stream exposed by SequentialWriter behaves as expected
+ */
+ @Test
+ public void outputStream()
+ {
+ File tempFile = new File(Files.createTempDir(), "test.txt");
+ Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+ try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+ {
+ os.writeUTF("123");
+ }
+ catch (IOException e)
+ {
+ Assert.fail();
+ }
+
+ Assert.assertTrue("temp file should exist", tempFile.exists());
+ }
+
}
[5/7] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Conflicts:
src/java/org/apache/cassandra/io/util/SequentialWriter.java
src/java/org/apache/cassandra/utils/concurrent/Transactional.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c27abf6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c27abf6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c27abf6
Branch: refs/heads/trunk
Commit: 1c27abf6c3f8199cd75b8b1930ce3a8285e627bb
Parents: be45eb6 a3fc425
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:27 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:27 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 7 ++++-
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 905a5c6,0c39469..0375e23
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -170,6 -168,79 +171,12 @@@ public class SequentialWriter extends B
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
- public void write(int value) throws ClosedChannelException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- if (!buffer.hasRemaining())
- {
- reBuffer();
- }
-
- buffer.put((byte) value);
-
- isDirty = true;
- syncNeeded = true;
- }
-
- public void write(byte[] buffer) throws IOException
- {
- write(buffer, 0, buffer.length);
- }
-
- public void write(byte[] data, int offset, int length) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int position = offset;
- int remaining = length;
- while (remaining > 0)
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- int toCopy = Math.min(remaining, buffer.remaining());
- buffer.put(data, position, toCopy);
-
- remaining -= toCopy;
- position += toCopy;
-
- isDirty = true;
- syncNeeded = true;
- }
- }
-
- public int write(ByteBuffer src) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int length = src.remaining();
- int finalLimit = src.limit();
- while (src.hasRemaining())
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- if (buffer.remaining() < src.remaining())
- src.limit(src.position() + buffer.remaining());
- buffer.put(src);
- src.limit(finalLimit);
-
- isDirty = true;
- syncNeeded = true;
- }
- return length;
- }
-
/**
* Synchronize file contents with disk.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c27abf6/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index f79a795,85c3de5..02562ce
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@@ -41,14 -41,14 +41,19 @@@ import static org.apache.cassandra.util
* If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
* to release any temporary state/resources
*
- * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
+ * All exceptions and assertions that may be thrown should be checked and ruled out during commit preparation.
+ * Commit should generally never throw an exception unless there is a real correctness-affecting exception that
+ * cannot be moved to prepareToCommit, in which case this operation MUST be executed before any other commit
+ * methods in the object graph.
+ *
+ * If exceptions are generated by commit after this initial moment, it is not at all clear what the correct behaviour
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
- * during cleanup, say), and rollback cannot now occur.
+ * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+ * should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
[7/7] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e951c98
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e951c98
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e951c98
Branch: refs/heads/trunk
Commit: 1e951c983197c696b9c456406125e8f7b5843a6b
Parents: 0fd857b 1c27abf
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 21 09:58:35 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:58:35 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 29 +++++++-------
.../cassandra/db/PartitionRangeReadCommand.java | 13 ++++---
.../cassandra/db/RangeSliceVerbHandler.java | 2 +-
.../org/apache/cassandra/db/ReadCommand.java | 40 +++++++++++++++-----
.../org/apache/cassandra/db/ReadResponse.java | 30 +++++++++++++++
.../db/SinglePartitionReadCommand.java | 4 +-
.../cassandra/io/util/SequentialWriter.java | 12 +++++-
.../apache/cassandra/net/MessagingService.java | 6 +--
.../utils/concurrent/Transactional.java | 7 +++-
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++
12 files changed, 131 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e951c98/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e6093ad,47fea8b..d03b430
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
+3.2
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0.0-beta1
+ * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)
* Rewrite hinted handoff (CASSANDRA-6230)
* Fix query on static compact tables (CASSANDRA-10093)
* Fix race during construction of commit log (CASSANDRA-10049)