You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:07 UTC
[5/7] cassandra git commit: Introduce Transactional API for internal
state changes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 087e57a..fa17c20 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -47,8 +47,10 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.utils.Throwables.merge;
public class BigTableWriter extends SSTableWriter
{
@@ -57,7 +59,7 @@ public class BigTableWriter extends SSTableWriter
// not very random, but the only value that can't be mistaken for a legal column-name length
public static final int END_OF_ROW = 0x0000;
- private IndexWriter iwriter;
+ private final IndexWriter iwriter;
private SegmentedFile.Builder dbuilder;
private final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
@@ -270,47 +272,6 @@ public class BigTableWriter extends SSTableWriter
return currentPosition;
}
- /**
- * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
- */
- public void abort()
- {
- assert descriptor.type.isTemporary;
- if (iwriter == null && dataFile == null)
- return;
-
- if (iwriter != null)
- iwriter.abort();
-
- if (dataFile!= null)
- dataFile.abort();
-
- if (dbuilder != null)
- dbuilder.close();
-
- Set<Component> components = SSTable.componentsFor(descriptor);
- try
- {
- if (!components.isEmpty())
- SSTable.delete(descriptor, components);
- }
- catch (FSWriteError e)
- {
- logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
- throw e;
- }
- }
-
- // we use this method to ensure any managed data we may have retained references to during the write are no
- // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
- public void isolateReferences()
- {
- // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
- // data retention is done through copying
- first = getMinimalKey(first);
- last = lastWrittenKey = getMinimalKey(last);
- }
-
private Descriptor makeTmpLinks()
{
// create temp links if they don't already exist
@@ -323,17 +284,14 @@ public class BigTableWriter extends SSTableWriter
return link;
}
- public SSTableReader openEarly(long maxDataAge)
+ public SSTableReader openEarly()
{
- StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(),
- repairedAt).get(MetadataType.STATS);
-
// find the max (exclusive) readable key
IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
if (boundary == null)
return null;
+ StatsMetadata stats = statsMetadata();
assert boundary.indexLength > 0 && boundary.dataLength > 0;
Descriptor link = makeTmpLinks();
// open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
@@ -343,7 +301,7 @@ public class BigTableWriter extends SSTableWriter
components, metadata,
partitioner, ifile,
dfile, iwriter.summary.build(partitioner, boundary),
- iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+ iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
sstable.first = getMinimalKey(first);
@@ -351,31 +309,23 @@ public class BigTableWriter extends SSTableWriter
return sstable;
}
- public SSTableReader closeAndOpenReader()
+ public SSTableReader openFinalEarly()
{
- return closeAndOpenReader(System.currentTimeMillis());
+ // we must ensure the data is completely flushed to disk
+ dataFile.sync();
+ iwriter.indexFile.sync();
+ return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
}
- public SSTableReader closeAndOpenReader(long maxDataAge)
+ private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
{
- return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
- }
-
- public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
- {
- assert finishType != FinishType.CLOSE;
- Pair<Descriptor, StatsMetadata> p;
-
- p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
- Descriptor desc = p.left;
- StatsMetadata metadata = p.right;
-
- if (finishType == FinishType.EARLY)
- desc = makeTmpLinks();
+ if (maxDataAge < 0)
+ maxDataAge = System.currentTimeMillis();
+ StatsMetadata stats = statsMetadata();
// finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType.isFinal);
- SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType.isFinal);
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA));
SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
components,
this.metadata,
@@ -385,81 +335,93 @@ public class BigTableWriter extends SSTableWriter
iwriter.summary.build(partitioner),
iwriter.bf.sharedCopy(),
maxDataAge,
- metadata,
- finishType.openReason);
+ stats,
+ openReason);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
-
- if (finishType.isFinal)
- {
- iwriter.bf.close();
- iwriter.summary.close();
- // try to save the summaries to disk
- sstable.saveSummary(iwriter.builder, dbuilder);
- iwriter.builder.close();
- iwriter = null;
- dbuilder.close();
- dbuilder = null;
- }
return sstable;
}
- // Close the writer and return the descriptor to the new sstable and it's metadata
- public Pair<Descriptor, StatsMetadata> close()
+ protected SSTableWriter.TransactionalProxy txnProxy()
{
- Pair<Descriptor, StatsMetadata> ret = close(FinishType.CLOSE, this.repairedAt);
- if (dbuilder != null)
- dbuilder.close();
- if (iwriter != null)
- iwriter.builder.close();
- return ret;
+ return new TransactionalProxy();
}
- private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
+ class TransactionalProxy extends SSTableWriter.TransactionalProxy
{
- switch (type)
+ // finalise our state on disk, including renaming
+ protected void doPrepare()
{
- case EARLY: case CLOSE: case NORMAL:
- iwriter.close();
- dataFile.close();
- if (type == FinishType.CLOSE)
- iwriter.bf.close();
- }
+ Map<MetadataType, MetadataComponent> metadataComponents = finalizeMetadata();
- // write sstable statistics
- Map<MetadataType, MetadataComponent> metadataComponents;
- metadataComponents = metadataCollector
- .finalizeMetadata(partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(),repairedAt);
+ iwriter.prepareToCommit();
- // remove the 'tmp' marker from all components
- Descriptor descriptor = this.descriptor;
- if (type.isFinal)
- {
- dataFile.writeFullChecksum(descriptor);
+ // write sstable statistics
+ dataFile.setDescriptor(descriptor).prepareToCommit();
writeMetadata(descriptor, metadataComponents);
+
// save the table of components
SSTable.appendTOC(descriptor, components);
- descriptor = rename(descriptor, components);
+
+ // rename to final
+ rename(descriptor, components);
+
+ if (openResult)
+ finalReader = openFinal(descriptor.asType(Descriptor.Type.FINAL), SSTableReader.OpenReason.NORMAL);
}
- return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ accumulate = dataFile.commit(accumulate);
+ accumulate = iwriter.commit(accumulate);
+ return accumulate;
+ }
+
+ protected Throwable doCleanup(Throwable accumulate)
+ {
+ accumulate = dbuilder.close(accumulate);
+ return accumulate;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ accumulate = iwriter.abort(accumulate);
+ accumulate = dataFile.abort(accumulate);
+
+ accumulate = delete(descriptor, accumulate);
+ if (!openResult)
+ accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), accumulate);
+ return accumulate;
+ }
+
+ private Throwable delete(Descriptor desc, Throwable accumulate)
+ {
+ try
+ {
+ Set<Component> components = SSTable.discoverComponentsFor(desc);
+ if (!components.isEmpty())
+ SSTable.delete(desc, components);
+ }
+ catch (Throwable t)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), t);
+ accumulate = merge(accumulate, t);
+ }
+ return accumulate;
+ }
}
private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
- SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
- try
+ File file = new File(desc.filenameFor(Component.STATS));
+ try (SequentialWriter out = SequentialWriter.open(file);)
{
desc.getMetadataSerializer().serialize(components, out.stream);
+ out.setDescriptor(desc).finish();
}
catch (IOException e)
{
- throw new FSWriteError(e, out.getPath());
- }
- finally
- {
- out.close();
+ throw new FSWriteError(e, file.getPath());
}
}
@@ -476,7 +438,7 @@ public class BigTableWriter extends SSTableWriter
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
- class IndexWriter
+ class IndexWriter extends AbstractTransactional implements Transactional
{
private final SequentialWriter indexFile;
public final SegmentedFile.Builder builder;
@@ -535,18 +497,10 @@ public class BigTableWriter extends SSTableWriter
builder.addPotentialBoundary(indexStart);
}
- public void abort()
- {
- summary.close();
- indexFile.abort();
- bf.close();
- builder.close();
- }
-
/**
* Closes the index and bloomfilter, making the public state of this writer valid for consumption.
*/
- public void close()
+ void flushBf()
{
if (components.contains(Component.FILTER))
{
@@ -566,11 +520,6 @@ public class BigTableWriter extends SSTableWriter
throw new FSWriteError(e, path);
}
}
-
- // index
- long position = indexFile.getFilePointer();
- indexFile.close(); // calls force
- FileUtils.truncate(indexFile.getPath(), position);
}
public void mark()
@@ -585,5 +534,40 @@ public class BigTableWriter extends SSTableWriter
// we assume that if that worked then we won't be trying to reset.
indexFile.resetAndTruncate(mark);
}
+
+ protected void doPrepare()
+ {
+ flushBf();
+
+ // truncate index file
+ long position = iwriter.indexFile.getFilePointer();
+ iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
+ FileUtils.truncate(iwriter.indexFile.getPath(), position);
+
+ // save summary
+ summary.prepareToCommit();
+ try (IndexSummary summary = iwriter.summary.build(partitioner))
+ {
+ SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary);
+ }
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return indexFile.commit(accumulate);
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return indexFile.abort(accumulate);
+ }
+
+ protected Throwable doCleanup(Throwable accumulate)
+ {
+ accumulate = summary.close(accumulate);
+ accumulate = bf.close(accumulate);
+ accumulate = builder.close(accumulate);
+ return accumulate;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index e8b719e..b623e54 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -41,9 +41,8 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+ public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
- assert !isFinal || overrideLength <= 0;
long length = overrideLength > 0 ? overrideLength : channel.size();
return new BufferedPoolingSegmentedFile(channel, length);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index b4d966a..2c59def 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -48,9 +48,8 @@ public class BufferedSegmentedFile extends SegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+ public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
- assert !isFinal || overrideLength <= 0;
long length = overrideLength > 0 ? overrideLength : channel.size();
return new BufferedSegmentedFile(channel, length);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index d28a14d..ec68c2d 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.util;
import java.io.File;
import java.nio.ByteBuffer;
-import org.apache.cassandra.io.sstable.Descriptor;
-
public class ChecksummedSequentialWriter extends SequentialWriter
{
private final SequentialWriter crcWriter;
@@ -44,20 +42,36 @@ public class ChecksummedSequentialWriter extends SequentialWriter
crcMetadata.appendDirect(toAppend, false);
}
- public void writeFullChecksum(Descriptor descriptor)
+ protected class TransactionalProxy extends SequentialWriter.TransactionalProxy
{
- crcMetadata.writeFullChecksum(descriptor);
- }
+ @Override
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return crcWriter.commit(accumulate);
+ }
- public void close()
- {
- super.close();
- crcWriter.close();
+ @Override
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return super.doAbort(crcWriter.abort(accumulate));
+ }
+
+ @Override
+ protected void doPrepare()
+ {
+ syncInternal();
+ if (descriptor != null)
+ crcMetadata.writeFullChecksum(descriptor);
+ crcWriter.setDescriptor(descriptor).prepareToCommit();
+ // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
+ // TODO: once we stop file renaming, remove this for clarity
+ releaseFileHandle();
+ }
}
- public void abort()
+ @Override
+ protected SequentialWriter.TransactionalProxy txnProxy()
{
- super.abort();
- crcWriter.abort();
+ return new TransactionalProxy();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index cb30131..fdc4f61 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -96,9 +96,9 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
// only one segment in a standard-io file
}
- public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+ public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
- return new CompressedPoolingSegmentedFile(channel, metadata(channel.filePath(), overrideLength, isFinal));
+ return new CompressedPoolingSegmentedFile(channel, metadata(channel.filePath(), overrideLength));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index caf4c22..ceff7ba 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -136,18 +136,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
// only one segment in a standard-io file
}
- protected CompressionMetadata metadata(String path, long overrideLength, boolean isFinal)
+ protected CompressionMetadata metadata(String path, long overrideLength)
{
if (writer == null)
return CompressionMetadata.create(path);
- return writer.open(overrideLength, isFinal);
+ return writer.open(overrideLength);
}
- public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+ public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
- assert !isFinal || overrideLength <= 0;
- return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength, isFinal));
+ return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 8007039..2566952 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -40,6 +40,9 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
public class FileUtils
{
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
@@ -107,24 +110,42 @@ public class FileUtils
return createTempFile(prefix, suffix, new File(System.getProperty("java.io.tmpdir")));
}
- public static void deleteWithConfirm(String file)
+ public static Throwable deleteWithConfirm(String filePath, boolean expect, Throwable accumulate)
{
- deleteWithConfirm(new File(file));
+ return deleteWithConfirm(new File(filePath), expect, accumulate);
}
- public static void deleteWithConfirm(File file)
+ public static Throwable deleteWithConfirm(File file, boolean expect, Throwable accumulate)
{
- assert file.exists() : "attempted to delete non-existing file " + file.getName();
- if (logger.isDebugEnabled())
- logger.debug("Deleting {}", file.getName());
+ boolean exists = file.exists();
+ assert exists || !expect : "attempted to delete non-existing file " + file.getName();
try
{
- Files.delete(file.toPath());
+ if (exists)
+ Files.delete(file.toPath());
}
- catch (IOException e)
+ catch (Throwable t)
{
- throw new FSWriteError(e, file);
+ try
+ {
+ throw new FSWriteError(t, file);
+ }
+ catch (Throwable t2)
+ {
+ accumulate = merge(accumulate, t2);
+ }
}
+ return accumulate;
+ }
+
+ public static void deleteWithConfirm(String file)
+ {
+ deleteWithConfirm(new File(file));
+ }
+
+ public static void deleteWithConfirm(File file)
+ {
+ maybeFail(deleteWithConfirm(file, true, null));
}
public static void renameWithOutConfirm(String from, String to)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 57295fe..91908c9 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -183,9 +183,8 @@ public class MmappedSegmentedFile extends SegmentedFile
}
}
- public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+ public SegmentedFile complete(ChannelProxy channel, long overrideLength)
{
- assert !isFinal || overrideLength <= 0;
long length = overrideLength > 0 ? overrideLength : channel.size();
// create the segments
return new MmappedSegmentedFile(channel, length, createSegments(channel, length));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SafeMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemory.java b/src/java/org/apache/cassandra/io/util/SafeMemory.java
index f96afcc..ad11472 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemory.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemory.java
@@ -62,6 +62,11 @@ public class SafeMemory extends Memory implements SharedCloseable
peer = 0;
}
+ public Throwable close(Throwable accumulate)
+ {
+ return ref.ensureReleased(accumulate);
+ }
+
public SafeMemory copy(long newSize)
{
SafeMemory copy = new SafeMemory(newSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 1fc374f..1096b5f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -71,6 +71,11 @@ public class SafeMemoryWriter extends DataOutputBuffer
memory.close();
}
+ public Throwable close(Throwable accumulate)
+ {
+ return memory.close(accumulate);
+ }
+
public long length()
{
return tailOffset(memory) + buffer.position();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index cb4d132..edbd742 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
/**
* Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
* FileDataInput. Allows for iteration over the FileDataInputs, or random access to the FileDataInput for a given
@@ -169,21 +171,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
* Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
* @param channel The channel to the file on disk.
*/
- protected abstract SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal);
+ protected abstract SegmentedFile complete(ChannelProxy channel, long overrideLength);
public SegmentedFile complete(String path)
{
- return complete(getChannel(path), -1, true);
- }
-
- public SegmentedFile complete(String path, boolean isFinal)
- {
- return complete(getChannel(path), -1, isFinal);
+ return complete(getChannel(path), -1);
}
public SegmentedFile complete(String path, long overrideLength)
{
- return complete(getChannel(path), overrideLength, false);
+ return complete(getChannel(path), overrideLength);
}
public void serializeBounds(DataOutput out) throws IOException
@@ -197,10 +194,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
}
- public void close()
+ public Throwable close(Throwable accumulate)
{
if (channel != null)
- channel.close();
+ return channel.close(accumulate);
+ return accumulate;
+ }
+
+ public void close()
+ {
+ maybeFail(close(null));
}
private ChannelProxy getChannel(String path)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index c4fef07..d63be31 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -24,8 +24,6 @@ import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -34,15 +32,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.utils.Throwables.merge;
/**
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
-public class SequentialWriter extends OutputStream implements WritableByteChannel
+public class SequentialWriter extends OutputStream implements WritableByteChannel, Transactional
{
- private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
-
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
@@ -71,6 +70,55 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
+ private final TransactionalProxy txnProxy = txnProxy();
+ protected Descriptor descriptor;
+
+ // due to lack of multiple-inheritance, we proxy our transactional implementation
+ protected class TransactionalProxy extends AbstractTransactional
+ {
+ @Override
+ protected Throwable doCleanup(Throwable accumulate)
+ {
+ if (directoryFD >= 0)
+ {
+ try { CLibrary.tryCloseFD(directoryFD); }
+ catch (Throwable t) { accumulate = merge(accumulate, t); }
+ directoryFD = -1;
+ }
+
+ // close is idempotent
+ try { channel.close(); }
+ catch (Throwable t) { accumulate = merge(accumulate, t); }
+
+ if (buffer != null)
+ {
+ try { FileUtils.clean(buffer); }
+ catch (Throwable t) { accumulate = merge(accumulate, t); }
+ buffer = null;
+ }
+
+ return accumulate;
+ }
+
+ protected void doPrepare()
+ {
+ syncInternal();
+ // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
+ // TODO: once we stop file renaming, remove this for clarity
+ releaseFileHandle();
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return accumulate;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return FileUtils.deleteWithConfirm(filePath, false, accumulate);
+ }
+ }
+
public SequentialWriter(File file, int bufferSize, boolean offheap)
{
try
@@ -383,49 +431,53 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return channel.isOpen();
}
- @Override
- public void close()
+ public SequentialWriter setDescriptor(Descriptor descriptor)
{
- if (buffer == null)
- return; // already closed
-
- syncInternal();
+ this.descriptor = descriptor;
+ return this;
+ }
- buffer = null;
+ public final void prepareToCommit()
+ {
+ txnProxy.prepareToCommit();
+ }
- cleanup(true);
+ public final Throwable commit(Throwable accumulate)
+ {
+ return txnProxy.commit(accumulate);
}
- public void abort()
+ public final Throwable abort(Throwable accumulate)
{
- cleanup(false);
+ return txnProxy.abort(accumulate);
}
- private void cleanup(boolean throwExceptions)
+ @Override
+ public final void close()
{
- if (directoryFD >= 0)
- {
- try { CLibrary.tryCloseFD(directoryFD); }
- catch (Throwable t) { handle(t, throwExceptions); }
- directoryFD = -1;
- }
+ txnProxy.close();
+ }
- // close is idempotent
- try { channel.close(); }
- catch (Throwable t) { handle(t, throwExceptions); }
+ public final void finish()
+ {
+ txnProxy.finish();
}
- private void handle(Throwable t, boolean throwExceptions)
+ protected TransactionalProxy txnProxy()
{
- if (!throwExceptions)
- logger.warn("Suppressing exception thrown while aborting writer", t);
- else
- throw new FSWriteError(t, getPath());
+ return new TransactionalProxy();
}
- // hack to make life easier for subclasses
- public void writeFullChecksum(Descriptor descriptor)
+ public void releaseFileHandle()
{
+ try
+ {
+ channel.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, filePath);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 1049d43..d4d49b3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -122,7 +122,7 @@ public class StreamReceiveTask extends StreamTask
lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
- readers.add(writer.closeAndOpenReader());
+ readers.add(writer.finish(true));
lockfile.delete();
task.sstables.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index d3a2683..7b187ac 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -71,7 +71,7 @@ public class SSTableImport
private final boolean isSorted;
private static final JsonFactory factory = new MappingJsonFactory().configure(
- JsonParser.Feature.INTERN_FIELD_NAMES, false);
+ JsonParser.Feature.INTERN_FIELD_NAMES, false);
static
{
@@ -143,10 +143,10 @@ public class SSTableImport
else
{
assert meta.isCQL3Table() || name.hasRemaining() : "Cell name should not be empty";
- value = stringAsType((String) fields.get(1),
- meta.getValueValidator(name.hasRemaining()
- ? comparator.cellFromByteBuffer(name)
- : meta.comparator.rowMarker(Composites.EMPTY)));
+ value = stringAsType((String) fields.get(1),
+ meta.getValueValidator(name.hasRemaining()
+ ? comparator.cellFromByteBuffer(name)
+ : meta.comparator.rowMarker(Composites.EMPTY)));
}
}
}
@@ -219,10 +219,10 @@ public class SSTableImport
cfamily.addAtom(new RangeTombstone(start, end, col.timestamp, col.localExpirationTime));
continue;
}
-
+
assert cfm.isCQL3Table() || col.getName().hasRemaining() : "Cell name should not be empty";
- CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName())
- : cfm.comparator.rowMarker(Composites.EMPTY);
+ CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName())
+ : cfm.comparator.rowMarker(Composites.EMPTY);
if (col.isExpiring())
{
@@ -345,13 +345,13 @@ public class SSTableImport
break;
}
- writer.closeAndOpenReader();
+ writer.finish(true);
return importedKeys;
}
private int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath,
- IPartitioner partitioner) throws IOException
+ IPartitioner partitioner) throws IOException
{
int importedKeys = 0; // already imported keys count
long start = System.nanoTime();
@@ -377,55 +377,56 @@ public class SSTableImport
System.out.printf("Importing %s keys...%n", keyCountToImport);
parser = getParser(jsonFile); // renewing parser
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+ try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
+ {
+ int lineNumber = 1;
+ DecoratedKey prevStoredKey = null;
- int lineNumber = 1;
- DecoratedKey prevStoredKey = null;
+ parser.nextToken(); // START_ARRAY
+ while (parser.nextToken() != null)
+ {
+ String key = parser.getCurrentName();
+ Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){});
+ DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key")));
- parser.nextToken(); // START_ARRAY
- while (parser.nextToken() != null)
- {
- String key = parser.getCurrentName();
- Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){});
- DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key")));
+ if (row.containsKey("metadata"))
+ parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null);
- if (row.containsKey("metadata"))
- parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null);
+ addColumnsToCF((List<?>) row.get("cells"), columnFamily);
- addColumnsToCF((List<?>) row.get("cells"), columnFamily);
+ if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
+ {
+ System.err
+ .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
+ lineNumber, key);
+ return -1;
+ }
- if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
- {
- System.err
- .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
- lineNumber, key);
- return -1;
- }
+ // saving decorated key
+ writer.append(currentKey, columnFamily);
+ columnFamily.clear();
- // saving decorated key
- writer.append(currentKey, columnFamily);
- columnFamily.clear();
+ prevStoredKey = currentKey;
+ importedKeys++;
+ lineNumber++;
- prevStoredKey = currentKey;
- importedKeys++;
- lineNumber++;
+ long current = System.nanoTime();
- long current = System.nanoTime();
+ if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
+ {
+ System.out.printf("Currently imported %d keys.%n", importedKeys);
+ start = current;
+ }
+
+ if (keyCountToImport == importedKeys)
+ break;
- if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
- {
- System.out.printf("Currently imported %d keys.%n", importedKeys);
- start = current;
}
- if (keyCountToImport == importedKeys)
- break;
+ writer.finish(true);
+ return importedKeys;
}
-
- writer.closeAndOpenReader();
-
- return importedKeys;
}
/**
@@ -511,7 +512,7 @@ public class SSTableImport
try
{
- new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable);
+ new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable);
}
catch (Exception e)
{
@@ -527,7 +528,7 @@ public class SSTableImport
private static void printProgramUsage()
{
System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
- SSTableImport.class.getName());
+ SSTableImport.class.getName());
System.out.println("Options:");
for (Object o : options.getOptions())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index a7f6fce..44d8f24 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -35,6 +35,11 @@ public class AlwaysPresentFilter implements IFilter
return this;
}
+ public Throwable close(Throwable accumulate)
+ {
+ return accumulate;
+ }
+
public long serializedSize() { return 0; }
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 552ca87..0a2bd28 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -29,4 +29,9 @@ public class Throwables
return existingFail;
}
+ public static void maybeFail(Throwable fail)
+ {
+ if (fail != null)
+ com.google.common.base.Throwables.propagate(fail);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 25f8510..ebabd79 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -11,12 +11,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
/**
* An object that needs ref counting does the two following:
* - defines a Tidy object that will cleanup once it's gone,
@@ -77,14 +79,19 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
state.release(false);
}
+ public Throwable ensureReleased(Throwable accumulate)
+ {
+ return state.ensureReleased(accumulate);
+ }
+
public void ensureReleased()
{
- state.ensureReleased();
+ maybeFail(state.ensureReleased(null));
}
public void close()
{
- state.ensureReleased();
+ ensureReleased();
}
public T get()
@@ -150,14 +157,15 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
assert released == 0;
}
- void ensureReleased()
+ Throwable ensureReleased(Throwable accumulate)
{
if (releasedUpdater.getAndSet(this, 1) == 0)
{
- globalState.release(this);
+ accumulate = globalState.release(this, accumulate);
if (DEBUG_ENABLED)
debug.deallocate();
}
+ return accumulate;
}
void release(boolean leak)
@@ -174,7 +182,7 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
}
return;
}
- globalState.release(this);
+ Throwable fail = globalState.release(this, null);
if (leak)
{
String id = this.toString();
@@ -186,6 +194,8 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
{
debug.deallocate();
}
+ if (fail != null)
+ logger.error("Error when closing {}", globalState, fail);
}
}
@@ -264,7 +274,7 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
}
// release a single reference, and cleanup if no more are extant
- void release(Ref.State ref)
+ Throwable release(Ref.State ref, Throwable accumulate)
{
locallyExtant.remove(ref);
if (-1 == counts.decrementAndGet())
@@ -276,10 +286,10 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
}
catch (Throwable t)
{
- logger.error("Error when closing {}", this, t);
- Throwables.propagate(t);
+ accumulate = merge(accumulate, t);
}
}
+ return accumulate;
}
int count()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index 1c6486e..dd65971 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -9,6 +9,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
/**
@@ -204,7 +205,10 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
public static void release(Iterable<? extends Ref<?>> refs)
{
- Throwable fail = null;
+ maybeFail(release(refs, null));
+ }
+ public static Throwable release(Iterable<? extends Ref<?>> refs, Throwable accumulate)
+ {
for (Ref ref : refs)
{
try
@@ -213,11 +217,10 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
}
catch (Throwable t)
{
- fail = merge(fail, t);
+ accumulate = merge(accumulate, t);
}
}
- if (fail != null)
- throw Throwables.propagate(fail);
+ return accumulate;
}
public static <T extends SelfRefCounted<T>> Iterable<Ref<T>> selfRefs(Iterable<T> refs)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
index 1e5a026..a3a1863 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
@@ -31,5 +31,6 @@ public interface SharedCloseable extends AutoCloseable
* Throws an exception if the shared resource has already been closed.
*/
public SharedCloseable sharedCopy();
+ public Throwable close(Throwable accumulate);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
index 0d3a843..d85fd54 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
@@ -44,4 +44,9 @@ public abstract class SharedCloseableImpl implements SharedCloseable
{
ref.ensureReleased();
}
+
+ public Throwable close(Throwable accumulate)
+ {
+ return ref.ensureReleased(accumulate);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
new file mode 100644
index 0000000..bcf5095
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * An abstraction for Transactional behaviour. An object implementing this interface has a lifetime
+ * of the following pattern:
+ *
+ * Throwable failure = null;
+ * try (Transactional t1, t2 = ...)
+ * {
+ * // do work with t1 and t2
+ * t1.prepareToCommit();
+ * t2.prepareToCommit();
+ * failure = t1.commit(failure);
+ * failure = t2.commit(failure);
+ * }
+ * logger.error(failure);
+ *
+ * If something goes wrong before commit() is called on any transaction, then on exiting the try block
+ * the auto close method should invoke cleanup() and then abort() to reset any state.
+ * If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
+ * to release any temporary state/resources
+ *
+ * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
+ * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
+ * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+ * should be checked and ruled out during commit preparation.
+ */
+public interface Transactional extends AutoCloseable
+{
+
+ /**
+ * A simple abstract implementation of Transactional behaviour.
+ * In general this should be used as the base class for any transactional implementations.
+ *
+ * If the implementation wraps any internal Transactional objects, it must proxy every
+ * commit() and abort() call onto each internal object to ensure correct behaviour
+ */
+ public static abstract class AbstractTransactional implements Transactional
+ {
+ public static enum State
+ {
+ IN_PROGRESS,
+ READY_TO_COMMIT,
+ COMMITTED,
+ ABORTED;
+ }
+
+ private State state = State.IN_PROGRESS;
+
+ // the methods for actually performing the necessary behaviours, that are themselves protected against
+ // improper use by the external implementations provided by this class. empty default implementations
+ // could be provided, but we consider it safer to force implementers to consider explicitly their presence
+
+ protected abstract Throwable doCommit(Throwable accumulate);
+ protected abstract Throwable doAbort(Throwable accumulate);
+
+ // this only needs to perform cleanup of state unique to this instance; any internal
+ // Transactional objects will perform cleanup in the commit() or abort() calls
+ protected abstract Throwable doCleanup(Throwable accumulate);
+
+ /**
+ * Do any preparatory work prior to commit. This method should throw any exceptions that can be encountered
+ * during the finalization of the behaviour.
+ */
+ protected abstract void doPrepare();
+
+ /**
+ * commit any effects of this transaction object graph, then cleanup; delegates first to doCommit, then to doCleanup
+ */
+ public final Throwable commit(Throwable accumulate)
+ {
+ if (state != State.READY_TO_COMMIT)
+ throw new IllegalStateException("Commit attempted before prepared to commit");
+ accumulate = doCommit(accumulate);
+ accumulate = doCleanup(accumulate);
+ state = State.COMMITTED;
+ return accumulate;
+ }
+
+ /**
+ * rollback any effects of this transaction object graph; delegates first to doCleanup, then to doAbort
+ */
+ public final Throwable abort(Throwable accumulate)
+ {
+ if (state == State.ABORTED)
+ return accumulate;
+ if (state == State.COMMITTED)
+ {
+ try
+ {
+ throw new IllegalStateException("Attempted to abort a committed operation");
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ return accumulate;
+ }
+ state = State.ABORTED;
+ // we cleanup first so that, e.g., file handles can be released prior to deletion
+ accumulate = doCleanup(accumulate);
+ accumulate = doAbort(accumulate);
+ return accumulate;
+ }
+
+ // if we are committed or aborted, then we are done; otherwise abort
+ public final void close()
+ {
+ switch (state)
+ {
+ case COMMITTED:
+ case ABORTED:
+ break;
+ default:
+ abort();
+ }
+ }
+
+ /**
+ * The first phase of commit: delegates to doPrepare(), with valid state transition enforcement.
+ * This call should be propagated onto any child objects participating in the transaction
+ */
+ public final void prepareToCommit()
+ {
+ if (state != State.IN_PROGRESS)
+ throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
+
+ doPrepare();
+ state = State.READY_TO_COMMIT;
+ }
+
+ /**
+ * convenience method to both prepareToCommit() and commit() in one operation;
+ * only of use to outer-most transactional object of an object graph
+ */
+ public Object finish()
+ {
+ prepareToCommit();
+ commit();
+ return this;
+ }
+
+ // convenience method wrapping abort, and throwing any exception encountered
+ // only of use to (and to be used by) outer-most object in a transactional graph
+ public final void abort()
+ {
+ maybeFail(abort(null));
+ }
+
+ // convenience method wrapping commit, and throwing any exception encountered
+ // only of use to (and to be used by) outer-most object in a transactional graph
+ public final void commit()
+ {
+ maybeFail(commit(null));
+ }
+
+ public final State state()
+ {
+ return state;
+ }
+ }
+
+ // commit should generally never throw an exception, and preferably never generate one,
+ // but if it does generate one it should accumulate it in the parameter and return the result
+ // IF a commit implementation has a real correctness affecting exception that cannot be moved to
+ // prepareToCommit, it MUST be executed before any other commit methods in the object graph
+ public Throwable commit(Throwable accumulate);
+
+ // release any resources, then rollback all state changes (unless commit() has already been invoked)
+ public Throwable abort(Throwable accumulate);
+
+ public void prepareToCommit();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 128d1b0..09121f4 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -336,7 +336,7 @@ public class ScrubTest
writer.append(Util.dk("c"), cf);
writer.append(Util.dk("y"), cf);
writer.append(Util.dk("d"), cf);
- writer.closeAndOpenReader();
+ writer.finish();
*/
String root = System.getProperty("corrupt-sstable-root");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 231b3f3..1dc72ae 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -156,11 +156,12 @@ public class AntiCompactionTest
File dir = cfs.directories.getDirectoryForNewSSTables();
String filename = cfs.getTempSSTablePath(dir);
- SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
-
- for (int i = 0; i < count * 5; i++)
- writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
- return writer.closeAndOpenReader();
+ try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);)
+ {
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.finish(true);
+ }
}
public void generateSStable(ColumnFamilyStore store, String Suffix)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 42ea0c7..18418e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -240,9 +240,9 @@ public class CompactionsTest
long newSize1 = it.next().uncompressedLength();
long newSize2 = it.next().uncompressedLength();
assertEquals("candidate sstable should not be tombstone-compacted because its key range overlap with other sstable",
- originalSize1, newSize1);
+ originalSize1, newSize1);
assertEquals("candidate sstable should not be tombstone-compacted because its key range overlap with other sstable",
- originalSize2, newSize2);
+ originalSize2, newSize2);
// now let's enable the magic property
store.metadata.compactionStrategyOptions.put("unchecked_tombstone_compaction", "true");
@@ -401,21 +401,24 @@ public class CompactionsTest
cf.addColumn(Util.column("a", "a", 3));
cf.deletionInfo().add(new RangeTombstone(Util.cellname("0"), Util.cellname("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator);
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
-
+ try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);)
+ {
+ writer.append(Util.dk("0"), cf);
+ writer.append(Util.dk("1"), cf);
+ writer.append(Util.dk("3"), cf);
- writer.append(Util.dk("0"), cf);
- writer.append(Util.dk("1"), cf);
- writer.append(Util.dk("3"), cf);
+ cfs.addSSTable(writer.finish(true));
+ }
- cfs.addSSTable(writer.closeAndOpenReader());
- writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
+ try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);)
+ {
+ writer.append(Util.dk("0"), cf);
+ writer.append(Util.dk("1"), cf);
+ writer.append(Util.dk("2"), cf);
+ writer.append(Util.dk("3"), cf);
+ cfs.addSSTable(writer.finish(true));
+ }
- writer.append(Util.dk("0"), cf);
- writer.append(Util.dk("1"), cf);
- writer.append(Util.dk("2"), cf);
- writer.append(Util.dk("3"), cf);
- cfs.addSSTable(writer.closeAndOpenReader());
Collection<SSTableReader> toCompact = cfs.getSSTables();
assert toCompact.size() == 2;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index 678b926..fe04096 100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@ -30,7 +30,7 @@ public class RandomAccessReaderTest
SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
writer.write(expected.getBytes());
- writer.close();
+ writer.finish();
assert f.exists();
@@ -58,7 +58,7 @@ public class RandomAccessReaderTest
SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
writer.write(expected.getBytes());
- writer.close();
+ writer.finish();
assert f.exists();
@@ -87,7 +87,7 @@ public class RandomAccessReaderTest
SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
for (int i = 0; i < numIterations; i++)
writer.write(expected.getBytes());
- writer.close();
+ writer.finish();
assert f.exists();
@@ -166,7 +166,7 @@ public class RandomAccessReaderTest
SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
for (int i = 0; i < expected.length; i++)
writer.write(expected[i].getBytes());
- writer.close();
+ writer.finish();
assert f.exists();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f4d3e87..cfc4bb8 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -81,7 +81,7 @@ public class CompressedRandomAccessReaderTest
for (int i = 0; i < 20; i++)
writer.write("x".getBytes());
- writer.close();
+ writer.finish();
CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
String res = reader.readLine();
@@ -124,7 +124,7 @@ public class CompressedRandomAccessReaderTest
writer.resetAndTruncate(mark);
writer.write("brown fox jumps over the lazy dog".getBytes());
- writer.close();
+ writer.finish();
assert f.exists();
RandomAccessReader reader = compressed
@@ -161,10 +161,11 @@ public class CompressedRandomAccessReaderTest
metadata.deleteOnExit();
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
- SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
-
- writer.write(CONTENT.getBytes());
- writer.close();
+ try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
+ {
+ writer.write(CONTENT.getBytes());
+ writer.finish();
+ }
ChannelProxy channel = new ChannelProxy(file);
@@ -175,8 +176,6 @@ public class CompressedRandomAccessReaderTest
RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);
// read and verify compressed data
assertEquals(CONTENT, reader.readLine());
- // close reader
- reader.close();
Random random = new Random();
RandomAccessFile checksumModifier = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 46da343..184319f 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -17,23 +17,31 @@
*/
package org.apache.cassandra.io.compress;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
+import java.util.*;
+import static org.apache.commons.io.FileUtils.readFileToByteArray;
import static org.junit.Assert.assertEquals;
+
+import org.junit.After;
import org.junit.Test;
+import junit.framework.Assert;
+import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriterTest;
-public class CompressedSequentialWriterTest
+public class CompressedSequentialWriterTest extends SequentialWriterTest
{
private ICompressor compressor;
@@ -78,30 +86,31 @@ public class CompressedSequentialWriterTest
try
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);
-
byte[] dataPre = new byte[bytesToTest];
byte[] rawPost = new byte[bytesToTest];
- Random r = new Random();
-
- // Test both write with byte[] and ByteBuffer
- r.nextBytes(dataPre);
- r.nextBytes(rawPost);
- ByteBuffer dataPost = makeBB(bytesToTest);
- dataPost.put(rawPost);
- dataPost.flip();
-
- writer.write(dataPre);
- FileMark mark = writer.mark();
-
- // Write enough garbage to transition chunk
- for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);)
{
- writer.write((byte)i);
+ Random r = new Random();
+
+ // Test both write with byte[] and ByteBuffer
+ r.nextBytes(dataPre);
+ r.nextBytes(rawPost);
+ ByteBuffer dataPost = makeBB(bytesToTest);
+ dataPost.put(rawPost);
+ dataPost.flip();
+
+ writer.write(dataPre);
+ FileMark mark = writer.mark();
+
+ // Write enough garbage to transition chunk
+ for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+ {
+ writer.write((byte)i);
+ }
+ writer.resetAndTruncate(mark);
+ writer.write(dataPost);
+ writer.finish();
}
- writer.resetAndTruncate(mark);
- writer.write(dataPost);
- writer.close();
assert f.exists();
RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
@@ -137,4 +146,85 @@ public class CompressedSequentialWriterTest
? ByteBuffer.allocateDirect(size)
: ByteBuffer.allocate(size);
}
+
+ private final List<TestableCSW> writers = new ArrayList<>();
+
+ @After
+ public void cleanup()
+ {
+ for (TestableCSW sw : writers)
+ sw.cleanup();
+ writers.clear();
+ }
+
+ protected TestableTransaction newTest() throws IOException
+ {
+ TestableCSW sw = new TestableCSW();
+ writers.add(sw);
+ return sw;
+ }
+
+ private static class TestableCSW extends TestableSW
+ {
+ final File offsetsFile;
+
+ private TestableCSW() throws IOException
+ {
+ this(tempFile("compressedsequentialwriter"),
+ tempFile("compressedsequentialwriter.offsets"));
+ }
+
+ private TestableCSW(File file, File offsetsFile) throws IOException
+ {
+ this(file, offsetsFile, new CompressedSequentialWriter(file, offsetsFile.getPath(), new CompressionParameters(LZ4Compressor.instance, BUFFER_SIZE, new HashMap<String, String>()), new MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false))));
+ }
+
+ private TestableCSW(File file, File offsetsFile, CompressedSequentialWriter sw) throws IOException
+ {
+ super(file, sw);
+ this.offsetsFile = offsetsFile;
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ Assert.assertTrue(file.exists());
+ Assert.assertFalse(offsetsFile.exists());
+ byte[] compressed = readFileToByteArray(file);
+ byte[] uncompressed = new byte[partialContents.length];
+ LZ4Compressor.instance.uncompress(compressed, 0, compressed.length - 4, uncompressed, 0);
+ Assert.assertTrue(Arrays.equals(partialContents, uncompressed));
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ Assert.assertTrue(file.exists());
+ Assert.assertTrue(offsetsFile.exists());
+ DataInputStream offsets = new DataInputStream(new ByteArrayInputStream(readFileToByteArray(offsetsFile)));
+ Assert.assertTrue(offsets.readUTF().endsWith("LZ4Compressor"));
+ Assert.assertEquals(0, offsets.readInt());
+ Assert.assertEquals(BUFFER_SIZE, offsets.readInt());
+ Assert.assertEquals(fullContents.length, offsets.readLong());
+ Assert.assertEquals(2, offsets.readInt());
+ Assert.assertEquals(0, offsets.readLong());
+ int offset = (int) offsets.readLong();
+ byte[] compressed = readFileToByteArray(file);
+ byte[] uncompressed = new byte[fullContents.length];
+ LZ4Compressor.instance.uncompress(compressed, 0, offset - 4, uncompressed, 0);
+ LZ4Compressor.instance.uncompress(compressed, offset, compressed.length - (4 + offset), uncompressed, partialContents.length);
+ Assert.assertTrue(Arrays.equals(fullContents, uncompressed));
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ super.assertAborted();
+ Assert.assertFalse(offsetsFile.exists());
+ }
+
+ void cleanup()
+ {
+ file.delete();
+ offsetsFile.delete();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
new file mode 100644
index 0000000..dfb55a1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+
+public class BigTableWriterTest extends AbstractTransactionalTest
+{
+ public static final String KEYSPACE1 = "BigTableWriterTest";
+ public static final String CF_STANDARD = "Standard1";
+
+ private static ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
+ cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ }
+
+ protected TestableTransaction newTest() throws IOException
+ {
+ return new TestableBTW();
+ }
+
+ private static class TestableBTW extends TestableTransaction
+ {
+ final File file;
+ final Descriptor descriptor;
+ final SSTableWriter writer;
+
+ private TestableBTW() throws IOException
+ {
+ this(cfs.getTempSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+ }
+
+ private TestableBTW(String file) throws IOException
+ {
+ this(file, SSTableWriter.create(file, 0, 0));
+ }
+
+ private TestableBTW(String file, SSTableWriter sw) throws IOException
+ {
+ super(sw);
+ this.file = new File(file);
+ this.descriptor = Descriptor.fromFilename(file);
+ this.writer = sw;
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < 10; i++)
+ cf.addColumn(Util.cellname(i), SSTableRewriterTest.random(0, 1000), 1);
+ for (int i = 0; i < 100; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ assertExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX);
+ assertNotExists(Descriptor.Type.TEMP, Component.FILTER, Component.SUMMARY);
+ assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ Assert.assertTrue(file.length() > 0);
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ assertExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+ Assert.assertFalse(file.exists());
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ assertPrepared();
+ }
+
+ private void assertExists(Descriptor.Type type, Component ... components)
+ {
+ for (Component component : components)
+ Assert.assertTrue(new File(descriptor.asType(type).filenameFor(component)).exists());
+ }
+ private void assertNotExists(Descriptor.Type type, Component ... components)
+ {
+ for (Component component : components)
+ Assert.assertFalse(type.toString() + " " + component.toString(), new File(descriptor.asType(type).filenameFor(component)).exists());
+ }
+ }
+
+}