You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/21 11:18:24 UTC
[4/7] cassandra git commit: Revert AutoSavingCache stream factory to
OutputStream
Revert AutoSavingCache stream factory to OutputStream
Adds a "non-transactional" flag to SequentialWriter
to convert its semantics to a plain OutputStream
patch by bdeggleston; reviewed by benedict
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3fc425d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3fc425d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3fc425d
Branch: refs/heads/cassandra-3.0
Commit: a3fc425dff25c42a49af38e87aa33501d4224195
Parents: 7a85c8b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 14 12:29:27 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 21 09:55:42 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 29 ++++++++++----------
.../cassandra/io/util/SequentialWriter.java | 12 +++++++-
.../utils/concurrent/Transactional.java | 8 +++---
.../cassandra/io/util/DataOutputTest.java | 3 +-
.../cassandra/io/util/SequentialWriterTest.java | 24 ++++++++++++++++
5 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 05653ba..f0f4e8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -49,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public interface IStreamFactory
{
- public InputStream getInputStream(File dataPath, File crcPath) throws IOException;
- public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException;
+ InputStream getInputStream(File dataPath, File crcPath) throws IOException;
+ OutputStream getOutputStream(File dataPath, File crcPath) throws FileNotFoundException;
}
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
@@ -71,9 +71,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
- public SequentialWriter getOutputWriter(File dataPath, File crcPath)
+ public OutputStream getOutputStream(File dataPath, File crcPath)
{
- return SequentialWriter.open(dataPath, crcPath);
+ return SequentialWriter.open(dataPath, crcPath).finishOnClose();
}
};
@@ -254,8 +254,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>();
- HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>();
+ HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
+ HashMap<UUID, OutputStream> streams = new HashMap<>();
HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
try
@@ -267,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!Schema.instance.hasCF(key.getCFId()))
continue; // the table has been dropped.
- DataOutputPlus writer = dataOutputs.get(cfId);
+ DataOutputPlus writer = writers.get(cfId);
if (writer == null)
{
Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
- SequentialWriter sequentialWriter;
+ OutputStream stream;
try
{
- sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right);
- writer = new WrappedDataOutputStreamPlus(sequentialWriter);
+ stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
+ writer = new WrappedDataOutputStreamPlus(stream);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
paths.put(cfId, cacheFilePaths);
- sequentialWriters.put(cfId, sequentialWriter);
- dataOutputs.put(cfId, writer);
+ streams.put(cfId, stream);
+ writers.put(cfId, writer);
}
try
@@ -312,14 +312,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
// not thrown (by OHC)
}
- for (SequentialWriter writer : sequentialWriters.values())
+ for (OutputStream writer : streams.values())
{
- writer.finish();
FileUtils.closeQuietly(writer);
}
}
- for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet())
+ for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
{
UUID cfId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 915133f..0c39469 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -72,6 +72,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
private final TransactionalProxy txnProxy = txnProxy();
+ private boolean finishOnClose;
protected Descriptor descriptor;
// due to lack of multiple-inheritance, we proxy our transactional implementation
@@ -167,6 +168,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
+ public SequentialWriter finishOnClose()
+ {
+ finishOnClose = true;
+ return this;
+ }
+
public void write(int value) throws ClosedChannelException
{
if (buffer == null)
@@ -472,7 +479,10 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
public final void close()
{
- txnProxy.close();
+ if (finishOnClose)
+ txnProxy.finish();
+ else
+ txnProxy.close();
}
public final void finish()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 5b0eb8e..85c3de5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -18,10 +18,6 @@
*/
package org.apache.cassandra.utils.concurrent;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -49,6 +45,10 @@ import static org.apache.cassandra.utils.Throwables.merge;
* of the system should be, and so simply logging the exception is likely best (since it may have been an issue
* during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
* should be checked and ruled out during commit preparation.
+ *
+ * Since Transactional implementations will abort any changes they've made if calls to prepareToCommit() and commit()
+ * aren't made prior to calling close(), the semantics of its close() method differ significantly from
+ * most AutoCloseable implementations.
*/
public interface Transactional extends AutoCloseable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 70993d3..bbdf4e1 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -179,10 +179,9 @@ public class DataOutputTest
{
File file = FileUtils.createTempFile("dataoutput", "test");
final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
- DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose());
DataInput canon = testWrite(write);
write.flush();
- writer.finish();
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
testRead(test, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3fc425d/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ce0f918..fd38427 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.io.Files;
import org.junit.After;
+import org.junit.Test;
import junit.framework.Assert;
@@ -116,4 +119,25 @@ public class SequentialWriterTest extends AbstractTransactionalTest
}
}
+ /**
+ * Tests that the output stream exposed by SequentialWriter behaves as expected
+ */
+ @Test
+ public void outputStream()
+ {
+ File tempFile = new File(Files.createTempDir(), "test.txt");
+ Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
+
+ try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+ {
+ os.writeUTF("123");
+ }
+ catch (IOException e)
+ {
+ Assert.fail();
+ }
+
+ Assert.assertTrue("temp file should exist", tempFile.exists());
+ }
+
}