You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:05 UTC
[3/7] cassandra git commit: Introduce Transactional API for internal
state changes
Introduce Transactional API for internal state changes
patch by benedict; reviewed by josh for CASSANDRA-8984
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8704006b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8704006b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8704006b
Branch: refs/heads/cassandra-2.2
Commit: 8704006bfa75a78cb904e35662e4c8bafc1f2330
Parents: ff10d63
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun May 17 13:50:03 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun May 17 13:50:03 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 14 +-
.../db/compaction/CompactionManager.java | 24 +-
.../cassandra/db/compaction/CompactionTask.java | 31 +--
.../cassandra/db/compaction/Scrubber.java | 21 +-
.../cassandra/db/compaction/Upgrader.java | 11 +-
.../writers/CompactionAwareWriter.java | 42 +++-
.../writers/DefaultCompactionWriter.java | 18 +-
.../writers/MajorLeveledCompactionWriter.java | 28 +--
.../writers/MaxSSTableSizeWriter.java | 16 +-
.../SplittingSizeTieredCompactionWriter.java | 17 +-
.../io/compress/CompressedSequentialWriter.java | 59 ++---
.../io/compress/CompressionMetadata.java | 131 +++++-----
.../cassandra/io/sstable/IndexSummary.java | 11 +-
.../io/sstable/IndexSummaryBuilder.java | 13 +-
.../apache/cassandra/io/sstable/SSTable.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 250 +++++++++----------
.../io/sstable/SSTableSimpleUnsortedWriter.java | 56 +++--
.../io/sstable/SSTableSimpleWriter.java | 9 +-
.../io/sstable/format/SSTableReader.java | 13 +-
.../io/sstable/format/SSTableWriter.java | 137 +++++++---
.../io/sstable/format/big/BigTableWriter.java | 248 +++++++++---------
.../io/util/BufferedPoolingSegmentedFile.java | 3 +-
.../io/util/BufferedSegmentedFile.java | 3 +-
.../io/util/ChecksummedSequentialWriter.java | 38 ++-
.../io/util/CompressedPoolingSegmentedFile.java | 4 +-
.../io/util/CompressedSegmentedFile.java | 9 +-
.../org/apache/cassandra/io/util/FileUtils.java | 39 ++-
.../cassandra/io/util/MmappedSegmentedFile.java | 3 +-
.../apache/cassandra/io/util/SafeMemory.java | 5 +
.../cassandra/io/util/SafeMemoryWriter.java | 5 +
.../apache/cassandra/io/util/SegmentedFile.java | 23 +-
.../cassandra/io/util/SequentialWriter.java | 116 ++++++---
.../cassandra/streaming/StreamReceiveTask.java | 2 +-
.../apache/cassandra/tools/SSTableImport.java | 97 +++----
.../cassandra/utils/AlwaysPresentFilter.java | 5 +
.../org/apache/cassandra/utils/Throwables.java | 5 +
.../apache/cassandra/utils/concurrent/Ref.java | 28 ++-
.../apache/cassandra/utils/concurrent/Refs.java | 11 +-
.../utils/concurrent/SharedCloseable.java | 1 +
.../utils/concurrent/SharedCloseableImpl.java | 5 +
.../utils/concurrent/Transactional.java | 198 +++++++++++++++
.../unit/org/apache/cassandra/db/ScrubTest.java | 2 +-
.../db/compaction/AntiCompactionTest.java | 11 +-
.../db/compaction/CompactionsTest.java | 31 +--
.../cassandra/io/RandomAccessReaderTest.java | 8 +-
.../CompressedRandomAccessReaderTest.java | 15 +-
.../CompressedSequentialWriterTest.java | 136 ++++++++--
.../io/sstable/BigTableWriterTest.java | 130 ++++++++++
.../io/sstable/SSTableRewriterTest.java | 154 +++++-------
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
.../io/util/BufferedRandomAccessFileTest.java | 28 +--
.../util/ChecksummedSequentialWriterTest.java | 92 +++++++
.../cassandra/io/util/DataOutputTest.java | 1 +
.../cassandra/io/util/SequentialWriterTest.java | 117 +++++++++
.../compress/CompressedInputStreamTest.java | 2 +-
.../cassandra/tools/SSTableExportTest.java | 16 +-
.../concurrent/AbstractTransactionalTest.java | 136 ++++++++++
58 files changed, 1731 insertions(+), 902 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3b25b5..9f14fba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.0-beta1
+ * Introduce Transactional API for internal state changes (CASSANDRA-8984)
* Add a flag in cassandra.yaml to enable UDFs (CASSANDRA-9404)
* Better support of null for UDF (CASSANDRA-8374)
* Use ecj instead of javassist for UDFs (CASSANDRA-8241)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ef47aba..3509b27 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.base.Throwables;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -341,8 +340,7 @@ public class Memtable
SSTableReader ssTable;
// errors when creating the writer that may leave empty temp files.
- SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory));
- try
+ try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
{
boolean trackContention = logger.isDebugEnabled();
int heavilyContendedRowCount = 0;
@@ -372,16 +370,13 @@ public class Memtable
if (writer.getFilePointer() > 0)
{
- writer.isolateReferences();
-
// temp sstables should contain non-repaired data.
- ssTable = writer.closeAndOpenReader();
+ ssTable = writer.finish(true);
logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
}
else
{
- writer.abort();
ssTable = null;
logger.info("Completed flushing; nothing needed to be retained. Commitlog position was {}",
context);
@@ -392,11 +387,6 @@ public class Memtable
return ssTable;
}
- catch (Throwable e)
- {
- writer.abort();
- throw Throwables.propagate(e);
- }
}
public SSTableWriter createFlushWriter(String filename)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 35e288d..fc83cc5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -42,7 +41,6 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
@@ -787,9 +785,9 @@ public class CompactionManager implements CompactionManagerMBean
metrics.beginCompaction(ci);
Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
- SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
List<SSTableReader> finished;
- try (CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+ CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
{
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -813,11 +811,6 @@ public class CompactionManager implements CompactionManagerMBean
finished = writer.finish();
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
}
- catch (Throwable e)
- {
- writer.abort();
- throw Throwables.propagate(e);
- }
finally
{
scanner.close();
@@ -1178,13 +1171,12 @@ public class CompactionManager implements CompactionManagerMBean
Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
-
long repairedKeyCount = 0;
long unrepairedKeyCount = 0;
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
@@ -1221,8 +1213,8 @@ public class CompactionManager implements CompactionManagerMBean
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
// so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
List<SSTableReader> anticompactedSSTables = new ArrayList<>();
- anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+ anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish());
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
@@ -1236,8 +1228,6 @@ public class CompactionManager implements CompactionManagerMBean
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error anticompacting " + anticompactionGroup, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f472711..c397d9a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -167,20 +167,12 @@ public class CompactionTask extends AbstractCompactionTask
if (collector != null)
collector.beginCompaction(ci);
long lastCheckObsoletion = start;
- CompactionAwareWriter writer = null;
- try
+
+ if (!controller.cfs.getCompactionStrategy().isActive)
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact))
{
- if (!controller.cfs.getCompactionStrategy().isActive)
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact);
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
{
@@ -201,19 +193,6 @@ public class CompactionTask extends AbstractCompactionTask
// don't replace old sstables yet, as we need to mark the compaction finished in the system table
newSStables = writer.finish();
}
- catch (Throwable t)
- {
- try
- {
- if (writer != null)
- writer.abort();
- }
- catch (Throwable t2)
- {
- t.addSuppressed(t2);
- }
- throw t;
- }
finally
{
// point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 691566e..29472b3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -127,8 +127,8 @@ public class Scrubber implements Closeable
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
- SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
- try
+
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);)
{
nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
{
@@ -271,26 +271,27 @@ public class Scrubber implements Closeable
{
// out of order rows, but no bad rows found - we can keep our repairedAt time
long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
- SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
- for (Row row : outOfOrderRows)
- inOrderWriter.append(row.key, row.cf);
- newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+ try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);)
+ {
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
+ newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
+ }
if (!isOffline)
cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
// finish obsoletes the old sstable
- List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+ List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish();
if (!finished.isEmpty())
newSstable = finished.get(0);
if (!isOffline)
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
}
- catch (Throwable t)
+ catch (IOException e)
{
- writer.abort();
- throw Throwables.propagate(t);
+ throw Throwables.propagate(e);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 75964e1..30584fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -81,8 +81,9 @@ public class Upgrader
{
outputHandler.output("Upgrading " + sstable);
Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
- SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
+
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -94,12 +95,6 @@ public class Upgrader
writer.finish();
outputHandler.output("Upgrade of " + sstable + " complete.");
-
- }
- catch (Throwable t)
- {
- writer.abort();
- throw Throwables.propagate(t);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 2903ced..fe43186 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -25,28 +25,32 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.concurrent.Transactional;
/**
* Class that abstracts away the actual writing of files to make it possible to use CompactionTask for more
* use cases.
*/
-public abstract class CompactionAwareWriter
+public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
{
protected final ColumnFamilyStore cfs;
protected final Set<SSTableReader> nonExpiredSSTables;
protected final long estimatedTotalKeys;
protected final long maxAge;
protected final long minRepairedAt;
+ protected final SSTableRewriter sstableWriter;
- public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> nonExpiredSSTables)
+ public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline)
{
this.cfs = cfs;
this.nonExpiredSSTables = nonExpiredSSTables;
this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+ this.sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
}
/**
@@ -56,16 +60,40 @@ public abstract class CompactionAwareWriter
*/
public abstract boolean append(AbstractCompactedRow row);
- /**
- * abort the compaction writer - make sure that all files created are removed etc
- */
- public abstract void abort();
+ @Override
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return sstableWriter.abort(accumulate);
+ }
+
+ @Override
+ protected Throwable doCleanup(Throwable accumulate)
+ {
+ return accumulate;
+ }
+
+ @Override
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return sstableWriter.commit(accumulate);
+ }
+
+ @Override
+ protected void doPrepare()
+ {
+ sstableWriter.prepareToCommit();
+ }
/**
* we are done, return the finished sstables so that the caller can mark the old ones as compacted
* @return all the written sstables sstables
*/
- public abstract List<SSTableReader> finish();
+ @Override
+ public List<SSTableReader> finish()
+ {
+ super.finish();
+ return sstableWriter.finished();
+ }
/**
* estimated number of keys we should write
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index d51c82d..3589b54 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
/**
* The default compaction writer - creates one output file in L0
@@ -41,13 +43,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
- private final SSTableRewriter sstableWriter;
public DefaultCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
{
- super(cfs, nonExpiredSSTables);
+ super(cfs, allSSTables, nonExpiredSSTables, offline);
logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
- sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
@@ -66,18 +66,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
}
@Override
- public void abort()
- {
- sstableWriter.abort();
- }
-
- @Override
- public List<SSTableReader> finish()
- {
- return sstableWriter.finish();
- }
-
- @Override
public long estimatedKeys()
{
return estimatedTotalKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index b2f8fe1..d48140e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class MajorLeveledCompactionWriter extends CompactionAwareWriter
{
private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class);
- private final SSTableRewriter rewriter;
private final long maxSSTableSize;
private final long expectedWriteSize;
private final Set<SSTableReader> allSSTables;
@@ -53,10 +52,9 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
{
- super(cfs, nonExpiredSSTables);
+ super(cfs, allSSTables, nonExpiredSSTables, offline);
this.maxSSTableSize = maxSSTableSize;
this.allSSTables = allSSTables;
- rewriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), offline);
expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
@@ -72,17 +70,17 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
cfs.metadata,
cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
- rewriter.switchWriter(writer);
+ sstableWriter.switchWriter(writer);
}
@Override
public boolean append(AbstractCompactedRow row)
{
- long posBefore = rewriter.currentWriter().getOnDiskFilePointer();
- RowIndexEntry rie = rewriter.append(row);
- totalWrittenInLevel += rewriter.currentWriter().getOnDiskFilePointer() - posBefore;
+ long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
+ RowIndexEntry rie = sstableWriter.append(row);
+ totalWrittenInLevel += sstableWriter.currentWriter().getOnDiskFilePointer() - posBefore;
partitionsWritten++;
- if (rewriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+ if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
{
if (totalWrittenInLevel > LeveledManifest.maxBytesForLevel(currentLevel, maxSSTableSize))
{
@@ -98,23 +96,11 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
cfs.metadata,
cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
- rewriter.switchWriter(writer);
+ sstableWriter.switchWriter(writer);
partitionsWritten = 0;
sstablesWritten++;
}
return rie != null;
}
-
- @Override
- public void abort()
- {
- rewriter.abort();
- }
-
- @Override
- public List<SSTableReader> finish()
- {
- return rewriter.finish();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 1a99059..ab24bf8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -36,7 +36,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
{
private final long estimatedTotalKeys;
private final long expectedWriteSize;
- private final SSTableRewriter sstableWriter;
private final long maxSSTableSize;
private final int level;
private final long estimatedSSTables;
@@ -44,7 +43,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
{
- super(cfs, nonExpiredSSTables);
+ super(cfs, allSSTables, nonExpiredSSTables, offline);
this.allSSTables = allSSTables;
this.level = level;
this.maxSSTableSize = maxSSTableSize;
@@ -52,7 +51,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
expectedWriteSize = Math.min(maxSSTableSize, totalSize);
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
- sstableWriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), offline);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
estimatedTotalKeys / estimatedSSTables,
@@ -83,18 +81,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
}
@Override
- public void abort()
- {
- sstableWriter.abort();
- }
-
- @Override
- public List<SSTableReader> finish()
- {
- return sstableWriter.finish();
- }
-
- @Override
public long estimatedKeys()
{
return estimatedTotalKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index c97270c..2a452c7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -48,7 +48,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
public static final long DEFAULT_SMALLEST_SSTABLE_BYTES = 50_000_000;
private final double[] ratios;
- private final SSTableRewriter sstableWriter;
private final long totalSize;
private final Set<SSTableReader> allSSTables;
private long currentBytesToWrite;
@@ -61,7 +60,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
{
- super(cfs, nonExpiredSSTables);
+ super(cfs, allSSTables, nonExpiredSSTables, false);
this.allSSTables = allSSTables;
totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
double[] potentialRatios = new double[20];
@@ -83,7 +82,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
}
}
ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
- sstableWriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), false);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
@@ -119,17 +117,4 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
}
return rie != null;
}
-
-
- @Override
- public void abort()
- {
- sstableWriter.abort();
- }
-
- @Override
- public List<SSTableReader> finish()
- {
- return sstableWriter.finish();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eb9dcf8..6218526 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -28,17 +28,12 @@ import java.util.zip.Adler32;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED_FINAL;
-
public class CompressedSequentialWriter extends SequentialWriter
{
private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
@@ -97,12 +92,6 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
- public void sync()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void flush()
{
throw new UnsupportedOperationException();
@@ -163,13 +152,11 @@ public class CompressedSequentialWriter extends SequentialWriter
runPostFlush.run();
}
- public CompressionMetadata open(long overrideLength, boolean isFinal)
+ public CompressionMetadata open(long overrideLength)
{
if (overrideLength <= 0)
- return metadataWriter.open(uncompressedSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
- // we are early opening the file, make sure we open metadata with the correct size
- assert !isFinal;
- return metadataWriter.open(overrideLength, chunkOffset, SHARED);
+ overrideLength = uncompressedSize;
+ return metadataWriter.open(overrideLength, chunkOffset);
}
@Override
@@ -279,36 +266,36 @@ public class CompressedSequentialWriter extends SequentialWriter
}
}
- @Override
- public void close()
+ protected class TransactionalProxy extends SequentialWriter.TransactionalProxy
{
- if (buffer == null)
- return;
-
- long finalPosition = current();
-
- super.close();
- sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
- try
+ @Override
+ protected Throwable doCommit(Throwable accumulate)
{
- metadataWriter.close(finalPosition, chunkCount);
+ return metadataWriter.commit(accumulate);
}
- catch (IOException e)
+
+ @Override
+ protected Throwable doAbort(Throwable accumulate)
{
- throw new FSWriteError(e, getPath());
+ return super.doAbort(metadataWriter.abort(accumulate));
}
- }
- public void abort()
- {
- super.abort();
- metadataWriter.abort();
+ @Override
+ protected void doPrepare()
+ {
+ syncInternal();
+ if (descriptor != null)
+ crcMetadata.writeFullChecksum(descriptor);
+ releaseFileHandle();
+ sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
+ metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
+ }
}
@Override
- public void writeFullChecksum(Descriptor descriptor)
+ protected SequentialWriter.TransactionalProxy txnProxy()
{
- crcMetadata.writeFullChecksum(descriptor);
+ return new TransactionalProxy();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 928541a..a6c7a8b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -36,9 +36,9 @@ import java.util.SortedSet;
import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
@@ -47,12 +47,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.SafeMemory;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Transactional;
/**
* Holds metadata about compressed file
@@ -265,7 +265,7 @@ public class CompressionMetadata
chunkOffsets.close();
}
- public static class Writer
+ public static class Writer extends Transactional.AbstractTransactional implements Transactional
{
// path to the file
private final CompressionParameters parameters;
@@ -274,6 +274,8 @@ public class CompressionMetadata
private SafeMemory offsets = new SafeMemory(maxCount * 8L);
private int count = 0;
+ // provided by user when setDescriptor
+ private long dataLength, chunkCount;
private Writer(CompressionParameters parameters, String path)
{
@@ -321,61 +323,60 @@ public class CompressionMetadata
}
}
- static enum OpenType
+ // we've written everything; wire up some final metadata state
+ public Writer finalizeLength(long dataLength, int chunkCount)
{
- // i.e. FinishType == EARLY; we will use the RefCountedMemory in possibly multiple instances
- SHARED,
- // i.e. FinishType == EARLY, but the sstable has been completely written, so we can
- // finalise the contents and size of the memory, but must retain a reference to it
- SHARED_FINAL,
- // i.e. FinishType == NORMAL or FINISH_EARLY, i.e. we have actually finished writing the table
- // and will never need to open the metadata again, so we can release any references to it here
- FINAL
+ this.dataLength = dataLength;
+ this.chunkCount = chunkCount;
+ return this;
}
- public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
+ public void doPrepare()
{
- SafeMemory offsets;
- int count = this.count;
- switch (type)
+ assert chunkCount == count;
+
+ // finalize the size of memory used if it won't now change;
+ // unnecessary if already correct size
+ if (offsets.size() != count * 8L)
+ {
+ SafeMemory tmp = offsets;
+ offsets = offsets.copy(count * 8L);
+ tmp.free();
+ }
+
+ // flush the data to disk
+ DataOutputStream out = null;
+ try
+ {
+ out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
+ writeHeader(out, dataLength, count);
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8L));
+ }
+ catch (IOException e)
{
- case FINAL: case SHARED_FINAL:
- if (this.offsets.size() != count * 8L)
- {
- // finalize the size of memory used if it won't now change;
- // unnecessary if already correct size
- SafeMemory tmp = this.offsets.copy(count * 8L);
- this.offsets.free();
- this.offsets = tmp;
- }
-
- if (type == OpenType.SHARED_FINAL)
- {
- offsets = this.offsets.sharedCopy();
- }
- else
- {
- offsets = this.offsets;
- // null out our reference to the original shared data to catch accidental reuse
- // note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe
- this.offsets = null;
- }
- break;
-
- case SHARED:
- offsets = this.offsets.sharedCopy();
- // we should only be opened on a compression data boundary; truncate our size to this boundary
- count = (int) (dataLength / parameters.chunkLength());
- if (dataLength % parameters.chunkLength() != 0)
- count++;
- // grab our actual compressed length from the next offset from our the position we're opened to
- if (count < this.count)
- compressedLength = offsets.getLong(count * 8L);
- break;
-
- default:
- throw new AssertionError();
+ throw Throwables.propagate(e);
}
+ finally
+ {
+ FileUtils.closeQuietly(out);
+ }
+ }
+
+ public CompressionMetadata open(long dataLength, long compressedLength)
+ {
+ SafeMemory offsets = this.offsets.sharedCopy();
+
+ // calculate how many entries we need, if our dataLength is truncated
+ int count = (int) (dataLength / parameters.chunkLength());
+ if (dataLength % parameters.chunkLength() != 0)
+ count++;
+
+ assert count > 0;
+ // grab our actual compressed length from the next offset from our the position we're opened to
+ if (count < this.count)
+ compressedLength = offsets.getLong(count * 8L);
+
return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
}
@@ -402,27 +403,19 @@ public class CompressionMetadata
count = chunkIndex;
}
- public void close(long dataLength, int chunks) throws IOException
+ protected Throwable doCleanup(Throwable failed)
{
- DataOutputStream out = null;
- try
- {
- out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
- assert chunks == count;
- writeHeader(out, dataLength, chunks);
- for (int i = 0 ; i < count ; i++)
- out.writeLong(offsets.getLong(i * 8L));
- }
- finally
- {
- FileUtils.closeQuietly(out);
- }
+ return offsets.close(failed);
}
- public void abort()
+ protected Throwable doCommit(Throwable accumulate)
{
- if (offsets != null)
- offsets.close();
+ return accumulate;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return FileUtils.deleteWithConfirm(filePath, false, accumulate);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index fbefe13..59c5eef 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -18,16 +18,20 @@
package org.apache.cassandra.io.sstable;
import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -46,6 +50,7 @@ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
*/
public class IndexSummary extends WrappedSharedCloseable
{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index c7c51e5..12e41c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -203,12 +203,16 @@ public class IndexSummaryBuilder implements AutoCloseable
}
}
- public IndexSummary build(IPartitioner partitioner)
+ public void prepareToCommit()
{
// this method should only be called when we've finished appending records, so we truncate the
// memory we're using to the exact amount required to represent it before building our summary
entries.setCapacity(entries.length());
offsets.setCapacity(offsets.length());
+ }
+
+ public IndexSummary build(IPartitioner partitioner)
+ {
return build(partitioner, null);
}
@@ -240,6 +244,13 @@ public class IndexSummaryBuilder implements AutoCloseable
offsets.close();
}
+ public Throwable close(Throwable accumulate)
+ {
+ accumulate = entries.close(accumulate);
+ accumulate = offsets.close(accumulate);
+ return accumulate;
+ }
+
public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
{
return (int) Math.ceil((samplingLevel * maxSummarySize) / (double) BASE_SAMPLING_LEVEL);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index f486b78..bc3486a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -195,7 +195,7 @@ public abstract class SSTable
}
}
- private static Set<Component> discoverComponentsFor(Descriptor desc)
+ public static Set<Component> discoverComponentsFor(Descriptor desc)
{
Set<Component.Type> knownTypes = Sets.difference(Component.TYPES, Collections.singleton(Component.Type.CUSTOM));
Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8890659..a526ec9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.io.sstable;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -33,6 +32,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Refs;
+import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -51,7 +52,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
* but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
* successfully.
*/
-public class SSTableRewriter
+public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional
{
private static long preemptiveOpenInterval;
static
@@ -77,7 +78,9 @@ public class SSTableRewriter
private final ColumnFamilyStore cfs;
private final long maxAge;
- private final List<SSTableReader> finished = new ArrayList<>();
+ private long repairedAt = -1;
+ // the set of final readers we will expose on commit
+ private final List<SSTableReader> preparedForCommit = new ArrayList<>();
private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
@@ -85,21 +88,18 @@ public class SSTableRewriter
private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finishedReaders = new ArrayList<>();
- private final Queue<Finished> finishedEarly = new ArrayDeque<>();
- // as writers are closed from finishedEarly, their last readers are moved
- // into discard, so that abort can cleanup after us safely
- private final List<SSTableReader> discard = new ArrayList<>();
- private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+ private final List<Finished> finishedWriters = new ArrayList<>();
+ // as writers are closed from finishedWriters, their last readers are moved into discard, so that abort can cleanup
+ // after us safely; we use a set so we can add in both prepareToCommit and abort
+ private final Set<SSTableReader> discard = new HashSet<>();
+ // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+ private final boolean isOffline;
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
- private State state = State.WORKING;
- private static enum State
- {
- WORKING, FINISHED, ABORTED
- }
+ // for testing (TODO: remove when have byteman setup)
+ private boolean throwEarly, throwLate;
public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
{
@@ -178,7 +178,7 @@ public class SSTableRewriter
}
else
{
- SSTableReader reader = writer.openEarly(maxAge);
+ SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly();
if (reader != null)
{
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -190,29 +190,19 @@ public class SSTableRewriter
}
}
- public void abort()
+ protected Throwable doAbort(Throwable accumulate)
{
- switch (state)
- {
- case ABORTED:
- return;
- case FINISHED:
- throw new IllegalStateException("Cannot abort - changes have already been committed");
- }
- state = State.ABORTED;
-
- Throwable fail = null;
try
{
moveStarts(null, null, true);
}
catch (Throwable t)
{
- fail = merge(fail, t);
+ accumulate = merge(accumulate, t);
}
- // remove already completed SSTables
- for (SSTableReader sstable : finished)
+ // cleanup any sstables we prepared for commit
+ for (SSTableReader sstable : preparedForCommit)
{
try
{
@@ -221,50 +211,41 @@ public class SSTableRewriter
}
catch (Throwable t)
{
- fail = merge(fail, t);
+ accumulate = merge(accumulate , t);
}
}
+ // abort the writers, and add the early opened readers to our discard pile
+
if (writer != null)
- finishedEarly.add(new Finished(writer, currentlyOpenedEarly));
+ finishedWriters.add(new Finished(writer, currentlyOpenedEarly));
- // abort the writers
- for (Finished finished : finishedEarly)
+ for (Finished finished : finishedWriters)
{
- try
- {
- finished.writer.abort();
- }
- catch (Throwable t)
- {
- fail = merge(fail, t);
- }
- try
- {
- if (finished.reader != null)
- {
- // if we've already been opened, add ourselves to the discard pile
- discard.add(finished.reader);
- finished.reader.markObsolete();
- }
- }
- catch (Throwable t)
- {
- fail = merge(fail, t);
- }
- }
+ accumulate = finished.writer.abort(accumulate);
- try
- {
- replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
- }
- catch (Throwable t)
- {
- fail = merge(fail, t);
+ // if we've already been opened, add ourselves to the discard pile
+ if (finished.reader != null)
+ discard.add(finished.reader);
}
- if (fail != null)
- throw Throwables.propagate(fail);
+ accumulate = replaceWithFinishedReaders(Collections.<SSTableReader>emptyList(), accumulate);
+ return accumulate;
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ for (Finished f : finishedWriters)
+ accumulate = f.writer.commit(accumulate);
+ accumulate = replaceWithFinishedReaders(preparedForCommit, accumulate);
+
+ return accumulate;
+ }
+
+ protected Throwable doCleanup(Throwable accumulate)
+ {
+ // we have no state of our own to cleanup; Transactional objects cleanup their own state in abort or commit
+ return accumulate;
}
/**
@@ -369,41 +350,38 @@ public class SSTableRewriter
public void switchWriter(SSTableWriter newWriter)
{
- if (writer == null)
+ if (writer == null || writer.getFilePointer() == 0)
{
+ if (writer != null)
+ writer.abort();
writer = newWriter;
return;
}
- if (writer.getFilePointer() != 0)
- {
- // If early re-open is disabled, simply finalize the writer and store it
- if (preemptiveOpenInterval == Long.MAX_VALUE)
- {
- SSTableReader reader = writer.finish(SSTableWriter.FinishType.NORMAL, maxAge, -1);
- finishedReaders.add(reader);
- }
- else
- {
- // we leave it as a tmp file, but we open it and add it to the dataTracker
- SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
- replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
- moveStarts(reader, reader.last, false);
- finishedEarly.add(new Finished(writer, reader));
- }
- }
- else
+ SSTableReader reader = null;
+ if (preemptiveOpenInterval != Long.MAX_VALUE)
{
- writer.abort();
+ // we leave it as a tmp file, but we open it and add it to the dataTracker
+ reader = writer.setMaxDataAge(maxAge).openFinalEarly();
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
+ moveStarts(reader, reader.last, false);
}
+ finishedWriters.add(new Finished(writer, reader));
+
currentlyOpenedEarly = null;
currentlyOpenedEarlyAt = 0;
writer = newWriter;
}
- public List<SSTableReader> finish()
+ /**
+ * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
+ * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
+ * repair time.
+ */
+ public SSTableRewriter setRepairedAt(long repairedAt)
{
- return finish(-1);
+ this.repairedAt = repairedAt;
+ return this;
}
/**
@@ -417,94 +395,92 @@ public class SSTableRewriter
* gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
*
*
- * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
- * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
- * repair time.
*/
- public List<SSTableReader> finish(long repairedAt)
+ public List<SSTableReader> finish()
{
- return finishAndMaybeThrow(repairedAt, false, false);
+ super.finish();
+ return finished();
}
- @VisibleForTesting
- void finishAndThrow(boolean throwEarly)
+ public List<SSTableReader> finished()
{
- finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+ assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT;
+ return preparedForCommit;
}
- private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
+ protected void doPrepare()
{
- switch (state)
- {
- case FINISHED: case ABORTED:
- throw new IllegalStateException("Cannot finish - changes have already been " + state.toString().toLowerCase());
- }
-
- List<SSTableReader> newReaders = new ArrayList<>();
switchWriter(null);
if (throwEarly)
throw new RuntimeException("exception thrown early in finish, for testing");
// No early open to finalize and replace
- if (preemptiveOpenInterval == Long.MAX_VALUE)
- {
- replaceWithFinishedReaders(finishedReaders);
- if (throwLate)
- throw new RuntimeException("exception thrown after all sstables finished, for testing");
- return finishedReaders;
- }
-
- while (!finishedEarly.isEmpty())
+ for (Finished f : finishedWriters)
{
- Finished f = finishedEarly.peek();
- if (f.writer.getFilePointer() > 0)
- {
- if (f.reader != null)
- discard.add(f.reader);
+ if (f.reader != null)
+ discard.add(f.reader);
- SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt);
+ f.writer.setRepairedAt(repairedAt).setMaxDataAge(maxAge).setOpenResult(true).prepareToCommit();
+ SSTableReader newReader = f.writer.finished();
- if (f.reader != null)
- f.reader.setReplacedBy(newReader);
+ if (f.reader != null)
+ f.reader.setReplacedBy(newReader);
- finished.add(newReader);
- newReaders.add(newReader);
- }
- else
- {
- f.writer.abort();
- assert f.reader == null;
- }
- finishedEarly.poll();
+ preparedForCommit.add(newReader);
}
if (throwLate)
throw new RuntimeException("exception thrown after all sstables finished, for testing");
+ }
- replaceWithFinishedReaders(newReaders);
- state = State.FINISHED;
- return finished;
+ @VisibleForTesting
+ void throwDuringPrepare(boolean throwEarly)
+ {
+ this.throwEarly = throwEarly;
+ this.throwLate = !throwEarly;
}
// cleanup all our temporary readers and swap in our new ones
- private void replaceWithFinishedReaders(List<SSTableReader> finished)
+ private Throwable replaceWithFinishedReaders(List<SSTableReader> finished, Throwable accumulate)
{
if (isOffline)
{
for (SSTableReader reader : discard)
{
- if (reader.getCurrentReplacement() == reader)
- reader.markObsolete();
- reader.selfRef().release();
+ try
+ {
+ if (reader.getCurrentReplacement() == reader)
+ reader.markObsolete();
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
}
+ accumulate = Refs.release(Refs.selfRefs(discard), accumulate);
}
else
{
- dataTracker.replaceEarlyOpenedFiles(discard, finished);
- dataTracker.unmarkCompacting(discard);
+ try
+ {
+ dataTracker.replaceEarlyOpenedFiles(discard, finished);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ try
+ {
+ dataTracker.unmarkCompacting(discard);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
}
discard.clear();
+ return accumulate;
}
private static final class Finished
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 5998044..d6ab940 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -157,6 +157,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
throw new RuntimeException(e);
}
+ checkForWriterException();
}
// This is overridden by CQLSSTableWriter to hold off replacing column family until the next iteration through
@@ -215,39 +216,40 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
public void run()
{
- SSTableWriter writer = null;
-
- while (true)
{
- try
+ while (true)
{
- Buffer b = writeQueue.take();
- if (b == SENTINEL)
- return;
-
- writer = getWriter();
- boolean first = true;
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
+ try
{
- if (entry.getValue().getColumnCount() > 0)
- writer.append(entry.getKey(), entry.getValue());
- else if (!first)
- throw new AssertionError("Empty partition");
- first = false;
+ Buffer b = writeQueue.take();
+ if (b == SENTINEL)
+ return;
+
+ try (SSTableWriter writer = getWriter();)
+ {
+ boolean first = true;
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
+ {
+ if (entry.getValue().getColumnCount() > 0)
+ writer.append(entry.getKey(), entry.getValue());
+ else if (!first)
+ throw new AssertionError("Empty partition");
+ first = false;
+ }
+
+ writer.finish(false);
+ }
+ }
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ // Keep only the first exception
+ if (exception == null)
+ exception = e;
}
- writer.close();
- }
- catch (Throwable e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- if (writer != null)
- writer.abort();
- // Keep only the first exception
- if (exception == null)
- exception = e;
}
- }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 3417d68..f206969 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -72,12 +74,11 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
if (currentKey != null)
writeRow(currentKey, columnFamily);
- writer.close();
+ writer.finish(false);
}
- catch (FSError e)
+ catch (Throwable t)
{
- writer.abort();
- throw e;
+ throw Throwables.propagate(writer.abort(t));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 4411ca7..23c27b0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -864,12 +864,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param ibuilder
* @param dbuilder
*/
+
public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
- saveSummary(ibuilder, dbuilder, indexSummary);
+ saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, indexSummary);
}
- private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary newSummary)
+ {
+ saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, newSummary);
+ }
+ /**
+ * Save index summary to Summary.db file.
+ */
+ public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last,
+ SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (summariesFile.exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index baacb5a..f99292e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -31,16 +31,17 @@ import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.concurrent.Transactional;
import java.io.DataInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
/**
@@ -49,30 +50,24 @@ import java.util.Set;
* TableWriter.create() is the primary way to create a writer for a particular format.
* The format information is part of the Descriptor.
*/
-public abstract class SSTableWriter extends SSTable
+public abstract class SSTableWriter extends SSTable implements Transactional
{
- private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
-
- public static enum FinishType
- {
- CLOSE(null, true),
- NORMAL(SSTableReader.OpenReason.NORMAL, true),
- EARLY(SSTableReader.OpenReason.EARLY, false), // no renaming
- FINISH_EARLY(SSTableReader.OpenReason.NORMAL, true); // tidy up an EARLY finish
- public final SSTableReader.OpenReason openReason;
-
- public final boolean isFinal;
- FinishType(SSTableReader.OpenReason openReason, boolean isFinal)
- {
- this.openReason = openReason;
- this.isFinal = isFinal;
- }
- }
-
- protected final long repairedAt;
+ protected long repairedAt;
+ protected long maxDataAge = -1;
protected final long keyCount;
protected final MetadataCollector metadataCollector;
protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+ protected final TransactionalProxy txnProxy = txnProxy();
+
+ protected abstract TransactionalProxy txnProxy();
+
+ // due to lack of multiple inheritance, we use an inner class to proxy our Transactional implementation details
+ protected abstract class TransactionalProxy extends AbstractTransactional
+ {
+ // should be set during doPrepare()
+ protected SSTableReader finalReader;
+ protected boolean openResult;
+ }
protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
{
@@ -164,28 +159,98 @@ public abstract class SSTableWriter extends SSTable
public abstract long getOnDiskFilePointer();
- public abstract void isolateReferences();
-
public abstract void resetAndTruncate();
- public SSTableReader closeAndOpenReader()
+ public SSTableWriter setRepairedAt(long repairedAt)
+ {
+ if (repairedAt > 0)
+ this.repairedAt = repairedAt;
+ return this;
+ }
+
+ public SSTableWriter setMaxDataAge(long maxDataAge)
+ {
+ this.maxDataAge = maxDataAge;
+ return this;
+ }
+
+ public SSTableWriter setOpenResult(boolean openResult)
+ {
+ txnProxy.openResult = openResult;
+ return this;
+ }
+
+ /**
+ * Open the resultant SSTableReader before it has been fully written
+ */
+ public abstract SSTableReader openEarly();
+
+ /**
+ * Open the resultant SSTableReader once it has been fully written, but before the
+ * _set_ of tables that are being written together as one atomic operation are all ready
+ */
+ public abstract SSTableReader openFinalEarly();
+
+ public SSTableReader finish(long repairedAt, long maxDataAge, boolean openResult)
{
- return closeAndOpenReader(System.currentTimeMillis());
+ if (repairedAt > 0)
+ this.repairedAt = repairedAt;
+ this.maxDataAge = maxDataAge;
+ return finish(openResult);
}
- public SSTableReader closeAndOpenReader(long maxDataAge)
+ public SSTableReader finish(boolean openResult)
{
- return finish(FinishType.NORMAL, maxDataAge, repairedAt);
+ txnProxy.openResult = openResult;
+ txnProxy.finish();
+ return finished();
}
- public abstract SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt);
+ /**
+ * Open the resultant SSTableReader once it has been fully written, and all related state
+ * is ready to be finalised including other sstables being written involved in the same operation
+ */
+ public SSTableReader finished()
+ {
+ return txnProxy.finalReader;
+ }
+
+ // finalise our state on disk, including renaming
+ public final void prepareToCommit()
+ {
+ txnProxy.prepareToCommit();
+ }
+
+ public final Throwable commit(Throwable accumulate)
+ {
+ return txnProxy.commit(accumulate);
+ }
+
+ public final Throwable abort(Throwable accumulate)
+ {
+ return txnProxy.abort(accumulate);
+ }
- public abstract SSTableReader openEarly(long maxDataAge);
+ public final void close()
+ {
+ txnProxy.close();
+ }
- // Close the writer and return the descriptor to the new sstable and it's metadata
- public abstract Pair<Descriptor, StatsMetadata> close();
+ public final void abort()
+ {
+ txnProxy.abort();
+ }
+ protected Map<MetadataType, MetadataComponent> finalizeMetadata()
+ {
+ return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(), repairedAt);
+ }
+ protected StatsMetadata statsMetadata()
+ {
+ return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
+ }
public static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
@@ -209,12 +274,6 @@ public abstract class SSTableWriter extends SSTable
}
- /**
- * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
- */
- public abstract void abort();
-
-
public static abstract class Factory
{
public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);