You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:47:00 UTC
[6/6] cassandra git commit: Extend Transactional API to sstable
lifecycle management
Extend Transactional API to sstable lifecycle management
patch by benedict; reviewed by marcus for CASSANDRA-8568
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5a76bdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5a76bdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5a76bdb
Branch: refs/heads/cassandra-2.2
Commit: e5a76bdb5fc04ffa16b8becaa7877186226c3b32
Parents: 33d71b8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Mar 12 10:23:35 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri May 22 09:44:36 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 249 ++++--
.../org/apache/cassandra/db/DataTracker.java | 793 -------------------
.../cassandra/db/HintedHandOffManager.java | 2 +-
src/java/org/apache/cassandra/db/Keyspace.java | 15 +-
src/java/org/apache/cassandra/db/Memtable.java | 22 +-
.../compaction/AbstractCompactionStrategy.java | 7 +-
.../db/compaction/AbstractCompactionTask.java | 19 +-
.../db/compaction/CompactionController.java | 4 +-
.../db/compaction/CompactionManager.java | 182 +++--
.../cassandra/db/compaction/CompactionTask.java | 54 +-
.../DateTieredCompactionStrategy.java | 17 +-
.../compaction/LeveledCompactionStrategy.java | 23 +-
.../db/compaction/LeveledCompactionTask.java | 11 +-
.../db/compaction/LeveledManifest.java | 11 +-
.../db/compaction/SSTableSplitter.java | 13 +-
.../cassandra/db/compaction/Scrubber.java | 21 +-
.../SizeTieredCompactionStrategy.java | 30 +-
.../cassandra/db/compaction/Upgrader.java | 15 +-
.../compaction/WrappingCompactionStrategy.java | 2 +-
.../writers/CompactionAwareWriter.java | 11 +-
.../writers/DefaultCompactionWriter.java | 11 +-
.../writers/MajorLeveledCompactionWriter.java | 11 +-
.../writers/MaxSSTableSizeWriter.java | 10 +-
.../SplittingSizeTieredCompactionWriter.java | 14 +-
.../AbstractSimplePerColumnSecondaryIndex.java | 4 +-
.../db/index/SecondaryIndexManager.java | 2 +-
.../apache/cassandra/db/lifecycle/Helpers.java | 241 ++++++
.../db/lifecycle/LifecycleTransaction.java | 511 ++++++++++++
.../db/lifecycle/SSTableIntervalTree.java | 40 +
.../apache/cassandra/db/lifecycle/Tracker.java | 468 +++++++++++
.../org/apache/cassandra/db/lifecycle/View.java | 252 ++++++
.../io/compress/CompressionMetadata.java | 2 +-
.../io/sstable/IndexSummaryManager.java | 106 ++-
.../io/sstable/SSTableDeletingTask.java | 27 +-
.../cassandra/io/sstable/SSTableRewriter.java | 295 ++-----
.../io/sstable/format/SSTableReader.java | 100 ++-
.../io/sstable/format/big/BigTableWriter.java | 6 +-
.../cassandra/io/util/SequentialWriter.java | 2 +-
.../cassandra/metrics/ColumnFamilyMetrics.java | 18 +-
.../cassandra/streaming/StreamSession.java | 7 +-
.../cassandra/tools/StandaloneScrubber.java | 12 +-
.../cassandra/tools/StandaloneSplitter.java | 7 +-
.../cassandra/tools/StandaloneUpgrader.java | 6 +-
.../cassandra/utils/concurrent/Blocker.java | 63 ++
.../utils/concurrent/Transactional.java | 31 +-
.../db/compaction/LongCompactionsTest.java | 10 +-
test/unit/org/apache/cassandra/MockSchema.java | 167 ++++
test/unit/org/apache/cassandra/Util.java | 27 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 3 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 58 +-
.../db/compaction/AntiCompactionTest.java | 51 +-
.../compaction/CompactionAwareWriterTest.java | 45 +-
.../DateTieredCompactionStrategyTest.java | 6 +-
.../cassandra/db/lifecycle/HelpersTest.java | 158 ++++
.../db/lifecycle/LifecycleTransactionTest.java | 412 ++++++++++
.../cassandra/db/lifecycle/TrackerTest.java | 342 ++++++++
.../apache/cassandra/db/lifecycle/ViewTest.java | 202 +++++
.../io/sstable/IndexSummaryManagerTest.java | 123 ++-
.../cassandra/io/sstable/SSTableReaderTest.java | 11 +-
.../io/sstable/SSTableRewriterTest.java | 250 +++---
61 files changed, 3902 insertions(+), 1711 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8b59309..ca87385 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
* (cqlsh) Add support for native protocol 4 (CASSANDRA-9399)
* Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
* Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4452db2..cc9b26a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -34,6 +34,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.lifecycle.Tracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSWriteError;
import org.json.simple.*;
import org.slf4j.Logger;
@@ -80,6 +84,8 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
import com.clearspring.analytics.stream.Counter;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -149,12 +155,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* Memtables and SSTables on disk for this column family.
*
- * We synchronize on the DataTracker to ensure isolation when we want to make sure
+ * We synchronize on the Tracker to ensure isolation when we want to make sure
* that the memtable we're acting on doesn't change out from under us. I.e., flush
* syncronizes on it to make sure it can submit on both executors atomically,
* so anyone else who wants to make sure flush doesn't interfere should as well.
*/
- private final DataTracker data;
+ private final Tracker data;
/* The read order, used to track accesses to off-heap memtable storage */
public final OpOrder readOrdering = new OpOrder();
@@ -288,13 +294,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- private ColumnFamilyStore(Keyspace keyspace,
+ public ColumnFamilyStore(Keyspace keyspace,
+ String columnFamilyName,
+ IPartitioner partitioner,
+ int generation,
+ CFMetaData metadata,
+ Directories directories,
+ boolean loadSSTables)
+ {
+ this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true);
+ }
+
+
+ @VisibleForTesting
+ public ColumnFamilyStore(Keyspace keyspace,
String columnFamilyName,
IPartitioner partitioner,
int generation,
CFMetaData metadata,
Directories directories,
- boolean loadSSTables)
+ boolean loadSSTables,
+ boolean registerBookkeeping)
{
assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
@@ -315,7 +335,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
logger.info("Initializing {}.{}", keyspace.getName(), name);
// scan for sstables corresponding to this cf and load them
- data = new DataTracker(this, loadSSTables);
+ data = new Tracker(this, loadSSTables);
if (data.loadsstables)
{
@@ -343,46 +363,59 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.addIndexedColumn(info);
}
- // register the mbean
- String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
- mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName nameObj = new ObjectName(mbeanName);
- mbs.registerMBean(this, nameObj);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
- latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
+ if (registerBookkeeping)
{
- public void run()
+ // register the mbean
+ String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
+ mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
+ try
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName nameObj = new ObjectName(mbeanName);
+ mbs.registerMBean(this, nameObj);
+ }
+ catch (Exception e)
{
- SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
- switch (retryPolicy.type)
+ throw new RuntimeException(e);
+ }
+ logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
+ latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
+ {
+ public void run()
{
- case PERCENTILE:
- // get percentile in nanos
- sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d);
- break;
- case CUSTOM:
- // convert to nanos, since configuration is in millisecond
- sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d);
- break;
- default:
- sampleLatencyNanos = Long.MAX_VALUE;
- break;
+ SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
+ switch (retryPolicy.type)
+ {
+ case PERCENTILE:
+ // get percentile in nanos
+ sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d);
+ break;
+ case CUSTOM:
+ // convert to nanos, since configuration is in millisecond
+ sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d);
+ break;
+ default:
+ sampleLatencyNanos = Long.MAX_VALUE;
+ break;
+ }
}
- }
- }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
+ }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ latencyCalculator = ScheduledExecutors.optionalTasks.schedule(Runnables.doNothing(), 0, TimeUnit.NANOSECONDS);
+ mbeanName = null;
+ }
}
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
public void invalidate()
{
+ invalidate(true);
+ }
+
+ public void invalidate(boolean expectMBean)
+ {
// disable and cancel in-progress compactions before invalidating
valid = false;
@@ -392,21 +425,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
catch (Exception e)
{
- JVMStabilityInspector.inspectThrowable(e);
- // this shouldn't block anything.
- logger.warn("Failed unregistering mbean: {}", mbeanName, e);
+ if (expectMBean)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ // this shouldn't block anything.
+ logger.warn("Failed unregistering mbean: {}", mbeanName, e);
+ }
}
latencyCalculator.cancel(false);
SystemKeyspace.removeTruncationRecord(metadata.cfId);
- data.unreferenceSSTables();
+ data.dropSSTables();
indexManager.invalidate();
invalidateCaches();
}
/**
- * Removes every SSTable in the directory from the DataTracker's view.
+ * Removes every SSTable in the directory from the Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
void maybeRemoveUnreadableSSTables(File directory)
@@ -542,7 +578,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Replacing compacted sstables is atomic as far as observers of DataTracker are concerned, but not on the
+ * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the
* filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then
* their ancestors are removed.
*
@@ -826,7 +862,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/*
* switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
- * This method does not block except for synchronizing on DataTracker, but the Future it returns will
+ * This method does not block except for synchronizing on Tracker, but the Future it returns will
* not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
* marked clean up to the position owned by the Memtable.
*/
@@ -849,7 +885,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// reclaiming includes that which we are GC-ing;
float onHeapRatio = 0, offHeapRatio = 0;
long onHeapTotal = 0, offHeapTotal = 0;
- Memtable memtable = getDataTracker().getView().getCurrentMemtable();
+ Memtable memtable = getTracker().getView().getCurrentMemtable();
onHeapRatio += memtable.getAllocator().onHeap().ownershipRatio();
offHeapRatio += memtable.getAllocator().offHeap().ownershipRatio();
onHeapTotal += memtable.getAllocator().onHeap().owns();
@@ -859,7 +895,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
if (index.getIndexCfs() != null)
{
- MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+ MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
onHeapRatio += allocator.onHeap().ownershipRatio();
offHeapRatio += allocator.offHeap().ownershipRatio();
onHeapTotal += allocator.onHeap().owns();
@@ -984,7 +1020,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the DataTracker monitor.
+ * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the Tracker monitor.
* In the constructor the current memtable(s) are swapped, and a barrier on outstanding writes is issued;
* when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed
* before all memtables are immediately written, and the CL is either immediately marked clean or, if
@@ -1117,7 +1153,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
// to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
// swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
- Memtable current = cfs.getDataTracker().getView().getCurrentMemtable();
+ Memtable current = cfs.getTracker().getView().getCurrentMemtable();
// find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
// both on- and off-heap, and select the largest of the two ratios to weight this CF
@@ -1129,7 +1165,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
if (index.getIndexCfs() != null)
{
- MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+ MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
onHeap += allocator.onHeap().ownershipRatio();
offHeap += allocator.offHeap().ownershipRatio();
}
@@ -1278,7 +1314,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (!sstables.iterator().hasNext())
return ImmutableSet.of();
- DataTracker.SSTableIntervalTree tree = data.getView().intervalTree;
+ SSTableIntervalTree tree = data.getView().intervalTree;
Set<SSTableReader> results = null;
for (SSTableReader sstable : sstables)
@@ -1454,7 +1490,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
assert !sstables.isEmpty();
- data.markObsolete(sstables, compactionType);
+ maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
}
void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -1473,7 +1509,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* Package protected for access from the CompactionManager.
*/
- public DataTracker getDataTracker()
+ public Tracker getTracker()
{
return data;
}
@@ -1485,7 +1521,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Set<SSTableReader> getUncompactingSSTables()
{
- return data.getUncompactingSSTables();
+ return data.getUncompacting();
}
public ColumnFamily getColumnFamily(DecoratedKey key,
@@ -1759,7 +1795,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return repairedSSTables;
}
- public RefViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
+ public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter)
{
while (true)
{
@@ -1770,9 +1806,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public ViewFragment select(Function<DataTracker.View, List<SSTableReader>> filter)
+ public ViewFragment select(Function<View, List<SSTableReader>> filter)
{
- DataTracker.View view = data.getView();
+ View view = data.getView();
List<SSTableReader> sstables = view.intervalTree.isEmpty()
? Collections.<SSTableReader>emptyList()
: filter.apply(view);
@@ -1784,12 +1820,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for the given @param key, according to the interval tree
*/
- public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey key)
+ public Function<View, List<SSTableReader>> viewFilter(final DecoratedKey key)
{
assert !key.isMinimum();
- return new Function<DataTracker.View, List<SSTableReader>>()
+ return new Function<View, List<SSTableReader>>()
{
- public List<SSTableReader> apply(DataTracker.View view)
+ public List<SSTableReader> apply(View view)
{
return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key));
}
@@ -1800,17 +1836,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows within @param rowBounds, inclusive, according to the interval tree.
*/
- public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
+ public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
{
- return new Function<DataTracker.View, List<SSTableReader>>()
+ return new Function<View, List<SSTableReader>>()
{
- public List<SSTableReader> apply(DataTracker.View view)
+ public List<SSTableReader> apply(View view)
{
return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
}
};
}
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
+ */
+ public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
+ {
+ return new Function<View, List<SSTableReader>>()
+ {
+ public List<SSTableReader> apply(View view)
+ {
+ Set<SSTableReader> sstables = Sets.newHashSet();
+ for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+ {
+ for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ {
+ if (includeRepaired || !sstable.isRepaired())
+ sstables.add(sstable);
+ }
+ }
+
+ logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size());
+ return ImmutableList.copyOf(sstables);
+ }
+ };
+ }
+
public List<String> getSSTablesForKey(String key)
{
DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
@@ -2388,6 +2450,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* thread safety. All we do is wipe the sstable containers clean, while leaving the actual
* data files present on disk. (This allows tests to easily call loadNewSSTables on them.)
*/
+ @VisibleForTesting
public void clearUnsafe()
{
for (final ColumnFamilyStore cfs : concatWithIndexes())
@@ -2396,7 +2459,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
public Void call()
{
- cfs.data.init();
+ cfs.data.reset();
return null;
}
}, true);
@@ -2489,7 +2552,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// doublecheck that we finished, instead of timing out
for (ColumnFamilyStore cfs : selfWithIndexes)
{
- if (!cfs.getDataTracker().getCompacting().isEmpty())
+ if (!cfs.getTracker().getCompacting().isEmpty())
{
logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName);
return null;
@@ -2515,19 +2578,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public Iterable<SSTableReader> markAllCompacting()
+ public LifecycleTransaction markAllCompacting(final OperationType operationType)
{
- Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>()
+ Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
{
- public Iterable<SSTableReader> call() throws Exception
+ public LifecycleTransaction call() throws Exception
{
assert data.getCompacting().isEmpty() : data.getCompacting();
Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
if (Iterables.isEmpty(sstables))
- return Collections.emptyList();
- boolean success = data.markCompacting(sstables);
- assert success : "something marked things compacting while compactions are disabled";
- return sstables;
+ return null;
+ LifecycleTransaction modifier = data.tryModify(sstables, operationType);
+ assert modifier != null: "something marked things compacting while compactions are disabled";
+ return modifier;
}
};
@@ -2634,12 +2697,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public int getMeanColumns()
{
- return data.getMeanColumns();
+ long sum = 0;
+ long count = 0;
+ for (SSTableReader sstable : getSSTables())
+ {
+ long n = sstable.getEstimatedColumnCount().count();
+ sum += sstable.getEstimatedColumnCount().mean() * n;
+ count += n;
+ }
+ return count > 0 ? (int) (sum / count) : 0;
}
public long estimateKeys()
{
- return data.estimatedKeys();
+ long n = 0;
+ for (SSTableReader sstable : getSSTables())
+ n += sstable.estimatedKeys();
+ return n;
}
/** true if this CFS contains secondary index data */
@@ -2703,18 +2777,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- /**
- * Returns the creation time of the oldest memtable not fully flushed yet.
- */
- public long oldestUnflushedMemtable()
- {
- return data.getView().getOldestMemtable().creationTime();
- }
-
public boolean isEmpty()
{
- DataTracker.View view = data.getView();
- return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
+ View view = data.getView();
+ return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
}
private boolean isRowCacheEnabled()
@@ -2753,7 +2819,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public double getDroppableTombstoneRatio()
{
- return getDataTracker().getDroppableTombstoneRatio();
+ double allDroppable = 0;
+ long allColumns = 0;
+ int localTime = (int)(System.currentTimeMillis()/1000);
+
+ for (SSTableReader sstable : getSSTables())
+ {
+ allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
+ allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
+ }
+ return allColumns > 0 ? allDroppable / allColumns : 0;
}
public long trueSnapshotsSize()
@@ -2770,9 +2845,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially
// visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned
// (even if it completely replaces it)
- public static final Function<DataTracker.View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+ public static final Function<View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<View, List<SSTableReader>>()
{
- public List<SSTableReader> apply(DataTracker.View view)
+ public List<SSTableReader> apply(View view)
{
List<SSTableReader> sstables = new ArrayList<>();
for (SSTableReader sstable : view.compacting)
@@ -2785,9 +2860,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
};
- public static final Function<DataTracker.View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+ public static final Function<View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<View, List<SSTableReader>>()
{
- public List<SSTableReader> apply(DataTracker.View view)
+ public List<SSTableReader> apply(View view)
{
List<SSTableReader> sstables = new ArrayList<>();
for (SSTableReader sstable : CANONICAL_SSTABLES.apply(view))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
deleted file mode 100644
index 36f22c5..0000000
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ /dev/null
@@ -1,793 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.File;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.*;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.notifications.*;
-import org.apache.cassandra.utils.Interval;
-import org.apache.cassandra.utils.IntervalTree;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class DataTracker
-{
- private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
-
- public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
- public final ColumnFamilyStore cfstore;
- private final AtomicReference<View> view;
-
- // Indicates if it is safe to load the initial sstables (may not be true when running in
- //standalone processes meant to repair or upgrade sstables (e.g. standalone scrubber)
- public final boolean loadsstables;
-
- public DataTracker(ColumnFamilyStore cfstore, boolean loadsstables)
- {
- this.cfstore = cfstore;
- this.view = new AtomicReference<>();
- this.loadsstables = loadsstables;
- this.init();
- }
-
- // get the Memtable that the ordered writeOp should be directed to
- public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
- {
- // since any new memtables appended to the list after we fetch it will be for operations started
- // after us, we can safely assume that we will always find the memtable that 'accepts' us;
- // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
-
- // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
- // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
- // assign operations to a memtable that was retired/queued before we started)
- for (Memtable memtable : view.get().liveMemtables)
- {
- if (memtable.accepts(opGroup, replayPosition))
- return memtable;
- }
- throw new AssertionError(view.get().liveMemtables.toString());
- }
-
- public Set<SSTableReader> getSSTables()
- {
- return view.get().sstables;
- }
-
- public Set<SSTableReader> getUncompactingSSTables()
- {
- return view.get().nonCompactingSStables();
- }
-
- public Iterable<SSTableReader> getUncompactingSSTables(Iterable<SSTableReader> candidates)
- {
- final View v = view.get();
- return Iterables.filter(candidates, new Predicate<SSTableReader>()
- {
- public boolean apply(SSTableReader sstable)
- {
- return !v.compacting.contains(sstable);
- }
- });
- }
-
- public View getView()
- {
- return view.get();
- }
-
- /**
- * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
- * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
- * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
- * must be followed by discarding(m), they cannot be interleaved.
- *
- * @return the previously active memtable
- */
- public Memtable switchMemtable(boolean truncating)
- {
- Memtable newMemtable = new Memtable(cfstore);
- Memtable toFlushMemtable;
- View currentView, newView;
- do
- {
- currentView = view.get();
- toFlushMemtable = currentView.getCurrentMemtable();
- newView = currentView.switchMemtable(newMemtable);
- }
- while (!view.compareAndSet(currentView, newView));
-
- if (truncating)
- notifyRenewed(newMemtable);
-
- return toFlushMemtable;
- }
-
- public void markFlushing(Memtable memtable)
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.markFlushing(memtable);
- }
- while (!view.compareAndSet(currentView, newView));
- }
-
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
- {
- // sstable may be null if we flushed batchlog and nothing needed to be retained
-
- if (!cfstore.isValid())
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.replaceFlushed(memtable, sstable);
- if (sstable != null)
- newView = newView.replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
- }
- while (!view.compareAndSet(currentView, newView));
- return;
- }
-
- // back up before creating a new View (which makes the new one eligible for compaction)
- if (sstable != null)
- maybeIncrementallyBackup(sstable);
-
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.replaceFlushed(memtable, sstable);
- }
- while (!view.compareAndSet(currentView, newView));
-
- if (sstable != null)
- {
- addNewSSTablesSize(Arrays.asList(sstable));
- notifyAdded(sstable);
- }
- }
-
- public void maybeIncrementallyBackup(final SSTableReader sstable)
- {
- if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
- return;
-
- File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
- sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
- }
-
- /**
- * @return true if we are able to mark the given @param sstables as compacted, before anyone else
- *
- * Note that we could acquire references on the marked sstables and release them in
- * unmarkCompacting, but since we will never call markObsolete on a sstable marked
- * as compacting (unless there is a serious bug), we can skip this.
- */
- public boolean markCompacting(Iterable<SSTableReader> sstables)
- {
- return markCompacting(sstables, false, false);
- }
-
- public boolean markCompacting(Iterable<SSTableReader> sstables, boolean newTables, boolean offline)
- {
- assert sstables != null && !Iterables.isEmpty(sstables);
- while (true)
- {
- final View currentView = view.get();
- if (Iterables.any(sstables, Predicates.in(currentView.compacting)))
- return false;
-
- Predicate<SSTableReader> live = new Predicate<SSTableReader>()
- {
- public boolean apply(SSTableReader sstable)
- {
- return currentView.sstablesMap.get(sstable) == sstable && !sstable.isMarkedCompacted();
- }
- };
- if (newTables)
- assert !Iterables.any(sstables, Predicates.in(currentView.sstables));
- else if (!offline && !Iterables.all(sstables, live))
- return false;
-
- View newView = currentView.markCompacting(sstables);
- if (view.compareAndSet(currentView, newView))
- return true;
- }
- }
-
- /**
- * Removes files from compacting status: this is different from 'markObsolete'
- * because it should be run regardless of whether a compaction succeeded.
- */
- public void unmarkCompacting(Iterable<SSTableReader> unmark)
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.unmarkCompacting(unmark);
- }
- while (!view.compareAndSet(currentView, newView));
-
- if (!cfstore.isValid())
- {
- // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals
- // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are
- // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
- unreferenceSSTables();
- }
- }
-
- public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
- {
- removeSSTablesFromTracker(sstables);
- releaseReferences(sstables, false);
- notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
- }
-
- /**
- *
- * @param oldSSTables
- * @param allReplacements
- * @param compactionType
- */
- // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
- // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
- public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
- {
- removeSSTablesFromTracker(oldSSTables);
- releaseReferences(oldSSTables, false);
- notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
- addNewSSTablesSize(allReplacements);
- }
-
- public void addInitialSSTables(Collection<SSTableReader> sstables)
- {
- addSSTablesToTracker(sstables);
- // no notifications or backup necessary
- }
-
- public void addSSTables(Collection<SSTableReader> sstables)
- {
- addSSTablesToTracker(sstables);
- for (SSTableReader sstable : sstables)
- {
- maybeIncrementallyBackup(sstable);
- notifyAdded(sstable);
- }
- }
-
- /**
- * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
- *
- * @param toReplace
- * @param replaceWith
- */
- public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
- {
- replaceReaders(toReplace, replaceWith, true);
- }
-
- /**
- * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
- *
- * note that we dont track the live size of these sstables
- * @param toReplace
- * @param replaceWith
- */
- public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
- {
- for (SSTableReader s : toReplace)
- assert s.openReason == SSTableReader.OpenReason.EARLY;
- // note that we can replace an early opened file with a real one
- replaceReaders(toReplace, replaceWith, false);
- }
-
- /**
- * removes all sstables that are not busy compacting.
- */
- public void unreferenceSSTables()
- {
- Set<SSTableReader> notCompacting;
-
- View currentView, newView;
- do
- {
- currentView = view.get();
- if (!currentView.compacting.isEmpty())
- logger.error("Set of compacting sstables is non-empty when invalidating sstables {}", currentView.compacting);
- notCompacting = currentView.nonCompactingSStables();
- newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
- }
- while (!view.compareAndSet(currentView, newView));
-
- if (notCompacting.isEmpty())
- {
- // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
- return;
- }
- notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
- removeOldSSTablesSize(notCompacting);
- releaseReferences(notCompacting, true);
- }
-
- /**
- * Removes every SSTable in the directory from the DataTracker's view.
- * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
- */
- void removeUnreadableSSTables(File directory)
- {
- View currentView, newView;
- Set<SSTableReader> remaining = new HashSet<>();
- do
- {
- currentView = view.get();
- for (SSTableReader r : currentView.nonCompactingSStables())
- if (!r.descriptor.directory.equals(directory))
- remaining.add(r);
-
- if (remaining.size() == currentView.nonCompactingSStables().size())
- return;
-
- newView = currentView.replace(currentView.sstables, remaining);
- }
- while (!view.compareAndSet(currentView, newView));
- for (SSTableReader sstable : currentView.sstables)
- if (!remaining.contains(sstable))
- sstable.selfRef().release();
- notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
- }
-
- /** (Re)initializes the tracker, purging all references. */
- void init()
- {
- view.set(new View(
- ImmutableList.of(new Memtable(cfstore)),
- ImmutableList.<Memtable>of(),
- Collections.<SSTableReader, SSTableReader>emptyMap(),
- Collections.<SSTableReader>emptySet(),
- Collections.<SSTableReader>emptySet(),
- SSTableIntervalTree.empty()));
- }
-
- /**
- * A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see
- * SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519). This does not mark the old reader
- * as compacted.
- * @param oldSSTables replaced readers
- * @param newSSTables replacement readers
- */
- private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.replace(oldSSTables, newSSTables);
- }
- while (!view.compareAndSet(currentView, newView));
-
- if (!oldSSTables.isEmpty() && notify)
- notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
-
- for (SSTableReader sstable : newSSTables)
- sstable.setTrackedBy(this);
-
- Refs.release(Refs.selfRefs(oldSSTables));
- }
-
- private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
- }
- while (!view.compareAndSet(currentView, newView));
- removeOldSSTablesSize(oldSSTables);
- }
-
- private void addSSTablesToTracker(Collection<SSTableReader> sstables)
- {
- View currentView, newView;
- do
- {
- currentView = view.get();
- newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
- }
- while (!view.compareAndSet(currentView, newView));
- addNewSSTablesSize(sstables);
- }
-
- private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
- {
- for (SSTableReader sstable : newSSTables)
- {
- if (logger.isDebugEnabled())
- logger.debug(String.format("adding %s to list of files tracked for %s.%s",
- sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
- long size = sstable.bytesOnDisk();
- StorageMetrics.load.inc(size);
- cfstore.metric.liveDiskSpaceUsed.inc(size);
- cfstore.metric.totalDiskSpaceUsed.inc(size);
- sstable.setTrackedBy(this);
- }
- }
-
- private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
- {
- for (SSTableReader sstable : oldSSTables)
- {
- if (logger.isDebugEnabled())
- logger.debug(String.format("removing %s from list of files tracked for %s.%s",
- sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
- long size = sstable.bytesOnDisk();
- StorageMetrics.load.dec(size);
- cfstore.metric.liveDiskSpaceUsed.dec(size);
- }
- }
-
- private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
- {
- for (SSTableReader sstable : oldSSTables)
- {
- boolean firstToCompact = sstable.markObsolete();
- assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
- sstable.selfRef().release();
- }
- }
-
- public void spaceReclaimed(long size)
- {
- cfstore.metric.totalDiskSpaceUsed.dec(size);
- }
-
- public long estimatedKeys()
- {
- long n = 0;
- for (SSTableReader sstable : getSSTables())
- n += sstable.estimatedKeys();
- return n;
- }
-
- public int getMeanColumns()
- {
- long sum = 0;
- long count = 0;
- for (SSTableReader sstable : getSSTables())
- {
- long n = sstable.getEstimatedColumnCount().count();
- sum += sstable.getEstimatedColumnCount().mean() * n;
- count += n;
- }
- return count > 0 ? (int) (sum / count) : 0;
- }
-
- public double getDroppableTombstoneRatio()
- {
- double allDroppable = 0;
- long allColumns = 0;
- int localTime = (int)(System.currentTimeMillis()/1000);
-
- for (SSTableReader sstable : getSSTables())
- {
- allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
- allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
- }
- return allColumns > 0 ? allDroppable / allColumns : 0;
- }
-
- public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
- {
- INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
- }
-
- public void notifyAdded(SSTableReader added)
- {
- INotification notification = new SSTableAddedNotification(added);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
- }
-
- public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
- {
- INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
-
- }
-
- public void notifyDeleting(SSTableReader deleting)
- {
- INotification notification = new SSTableDeletingNotification(deleting);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
- }
-
- public void notifyRenewed(Memtable renewed)
- {
- INotification notification = new MemtableRenewedNotification(renewed);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
- }
-
- public void notifyTruncated(long truncatedAt)
- {
- INotification notification = new TruncationNotification(truncatedAt);
- for (INotificationConsumer subscriber : subscribers)
- subscriber.handleNotification(notification, this);
- }
-
- public void subscribe(INotificationConsumer consumer)
- {
- subscribers.add(consumer);
- }
-
- public void unsubscribe(INotificationConsumer consumer)
- {
- subscribers.remove(consumer);
- }
-
- public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables)
- {
- return new SSTableIntervalTree(buildIntervals(sstables));
- }
-
- public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
- {
- List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
- for (SSTableReader sstable : sstables)
- intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
- return intervals;
- }
-
- public Set<SSTableReader> getCompacting()
- {
- return getView().compacting;
- }
-
- public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
- {
- private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
-
- private SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
- {
- super(intervals);
- }
-
- public static SSTableIntervalTree empty()
- {
- return EMPTY;
- }
- }
-
- /**
- * An immutable structure holding the current memtable, the memtables pending
- * flush, the sstables for a column family, and the sstables that are active
- * in compaction (a subset of the sstables).
- */
- public static class View
- {
- /**
- * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
- * and the new replacement memtable, until all outstanding write operations on the old table complete.
- * The last item in the list is always the "current" memtable.
- */
- private final List<Memtable> liveMemtables;
- /**
- * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
- * flushed. In chronologically ascending order.
- */
- private final List<Memtable> flushingMemtables;
- public final Set<SSTableReader> compacting;
- public final Set<SSTableReader> sstables;
- // we use a Map here so that we can easily perform identity checks as well as equality checks.
- // When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
- // and we then check that not only are they all present in the live set, but that the exact instance present is
- // the one we made our decision to compact against.
- public final Map<SSTableReader, SSTableReader> sstablesMap;
-
- // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable
- public final Set<SSTableReader> shadowed;
- public final SSTableIntervalTree intervalTree;
-
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
- {
- this.shadowed = shadowed;
- assert liveMemtables != null;
- assert flushingMemtables != null;
- assert sstables != null;
- assert compacting != null;
- assert intervalTree != null;
-
- this.liveMemtables = liveMemtables;
- this.flushingMemtables = flushingMemtables;
-
- this.sstablesMap = sstables;
- this.sstables = sstablesMap.keySet();
- this.compacting = compacting;
- this.intervalTree = intervalTree;
- }
-
- public Memtable getOldestMemtable()
- {
- if (!flushingMemtables.isEmpty())
- return flushingMemtables.get(0);
- return liveMemtables.get(0);
- }
-
- public Memtable getCurrentMemtable()
- {
- return liveMemtables.get(liveMemtables.size() - 1);
- }
-
- public Iterable<Memtable> getMemtablesPendingFlush()
- {
- if (liveMemtables.size() == 1)
- return flushingMemtables;
- return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables);
- }
-
- /**
- * @return the active memtable and all the memtables that are pending flush.
- */
- public Iterable<Memtable> getAllMemtables()
- {
- return Iterables.concat(flushingMemtables, liveMemtables);
- }
-
- public Sets.SetView<SSTableReader> nonCompactingSStables()
- {
- return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
- }
-
- View switchMemtable(Memtable newMemtable)
- {
- List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
- return new View(newLiveMemtables, flushingMemtables, sstablesMap, compacting, shadowed, intervalTree);
- }
-
- View markFlushing(Memtable toFlushMemtable)
- {
- List<Memtable> live = liveMemtables, flushing = flushingMemtables;
-
- // since we can have multiple flushes queued, we may occasionally race and start a flush out of order,
- // so must locate it in the list to remove, rather than just removing from the beginning
- int i = live.indexOf(toFlushMemtable);
- assert i < live.size() - 1;
- List<Memtable> newLive = ImmutableList.<Memtable>builder()
- .addAll(live.subList(0, i))
- .addAll(live.subList(i + 1, live.size()))
- .build();
-
- // similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the
- // flushing list in a position other than the end, though this will be rare
- i = flushing.size();
- while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime())
- i--;
- List<Memtable> newFlushing = ImmutableList.<Memtable>builder()
- .addAll(flushing.subList(0, i))
- .add(toFlushMemtable)
- .addAll(flushing.subList(i, flushing.size()))
- .build();
-
- return new View(newLive, newFlushing, sstablesMap, compacting, shadowed, intervalTree);
- }
-
- View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
- {
- int index = flushingMemtables.indexOf(flushedMemtable);
- List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder()
- .addAll(flushingMemtables.subList(0, index))
- .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
- .build();
- Map<SSTableReader, SSTableReader> newSSTables = sstablesMap;
- SSTableIntervalTree intervalTree = this.intervalTree;
- if (newSSTable != null)
- {
- assert !sstables.contains(newSSTable);
- assert !shadowed.contains(newSSTable);
- newSSTables = ImmutableMap.<SSTableReader, SSTableReader>builder()
- .putAll(sstablesMap).put(newSSTable, newSSTable).build();
- intervalTree = buildIntervalTree(newSSTables.keySet());
- }
- return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree);
- }
-
- View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
- {
- ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
- int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements);
- assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
- Map<SSTableReader, SSTableReader> newSSTables = new HashMap<>(newSSTablesSize);
- Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
-
- for (SSTableReader sstable : sstables)
- if (!oldSet.contains(sstable))
- newSSTables.put(sstable, sstable);
-
- for (SSTableReader sstable : shadowed)
- if (!oldSet.contains(sstable))
- newShadowed.add(sstable);
-
- for (SSTableReader replacement : replacements)
- {
- if (replacement.openReason == SSTableReader.OpenReason.SHADOWED)
- newShadowed.add(replacement);
- else
- newSSTables.put(replacement, replacement);
- }
-
- assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
- String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
- newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this);
- newShadowed = ImmutableSet.copyOf(newShadowed);
- newSSTables = ImmutableMap.copyOf(newSSTables);
- SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables.keySet());
- return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree);
- }
-
- View markCompacting(Iterable<SSTableReader> tomark)
- {
- Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
- return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
- }
-
- View unmarkCompacting(Iterable<SSTableReader> tounmark)
- {
- Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
- return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
- }
-
- @Override
- public String toString()
- {
- return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
- }
-
- public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
- {
- if (intervalTree.isEmpty())
- return Collections.emptyList();
- RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
- return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 589958e..df8820b 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -250,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
protected synchronized void compact()
{
ArrayList<Descriptor> descriptors = new ArrayList<>();
- for (SSTable sstable : hintStore.getDataTracker().getUncompactingSSTables())
+ for (SSTable sstable : hintStore.getTracker().getUncompacting())
descriptors.add(sstable.descriptor);
if (descriptors.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f30bdaa..1d86784 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +146,7 @@ public class Keyspace
}
/**
- * Removes every SSTable in the directory from the appropriate DataTracker's view.
+ * Removes every SSTable in the directory from the appropriate Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
public static void removeUnreadableSSTables(File directory)
@@ -276,6 +275,18 @@ public class Keyspace
}
}
+ private Keyspace(KSMetaData metadata)
+ {
+ this.metadata = metadata;
+ createReplicationStrategy(metadata);
+ this.metric = new KeyspaceMetrics(this);
+ }
+
+ public static Keyspace mockKS(KSMetaData metadata)
+ {
+ return new Keyspace(metadata);
+ }
+
public void createReplicationStrategy(KSMetaData ksm)
{
replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index eab64ae..55b0bfe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -46,7 +49,7 @@ import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.*;
-public class Memtable
+public class Memtable implements Comparable<Memtable>
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
@@ -64,6 +67,11 @@ public class Memtable
// the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ public int compareTo(Memtable that)
+ {
+ return this.minReplayPosition.compareTo(that.minReplayPosition);
+ }
+
public static final class LastReplayPosition extends ReplayPosition
{
public LastReplayPosition(ReplayPosition copy) {
@@ -92,6 +100,15 @@ public class Memtable
this.cfs.scheduleFlush();
}
+ // ONLY to be used for testing, to create a mock Memtable
+ @VisibleForTesting
+ public Memtable(CFMetaData metadata)
+ {
+ this.initialComparator = metadata.comparator;
+ this.cfs = null;
+ this.allocator = null;
+ }
+
public MemtableAllocator getAllocator()
{
return allocator;
@@ -107,7 +124,8 @@ public class Memtable
return currentOperations.get();
}
- void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
+ @VisibleForTesting
+ public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
{
assert this.writeBarrier == null;
this.lastReplayPosition = lastReplayPosition;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 29826b8..38107c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -175,9 +176,9 @@ public abstract class AbstractCompactionStrategy
*/
public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore);
- public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
+ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final int gcBefore, long maxSSTableBytes)
{
- return new CompactionTask(cfs, sstables, gcBefore, false);
+ return new CompactionTask(cfs, txn, gcBefore, false);
}
/**
@@ -231,7 +232,7 @@ public abstract class AbstractCompactionStrategy
*/
public void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
- cfs.getDataTracker().replaceFlushed(memtable, sstable);
+ cfs.getTracker().replaceFlushed(memtable, sstable);
if (sstable != null)
CompactionManager.instance.submitBackground(cfs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index ac646ef..3bf224e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -24,27 +24,28 @@ import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorSt
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
public abstract class AbstractCompactionTask extends WrappedRunnable
{
protected final ColumnFamilyStore cfs;
- protected Set<SSTableReader> sstables;
+ protected LifecycleTransaction transaction;
protected boolean isUserDefined;
protected OperationType compactionType;
/**
* @param cfs
- * @param sstables must be marked compacting
+ * @param transaction the modifying managing the status of the sstables we're replacing
*/
- public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
+ public AbstractCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction)
{
this.cfs = cfs;
- this.sstables = sstables;
+ this.transaction = transaction;
this.isUserDefined = false;
this.compactionType = OperationType.COMPACTION;
// enforce contract that caller should mark sstables compacting
- Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
- for (SSTableReader sstable : sstables)
+ Set<SSTableReader> compacting = transaction.tracker.getCompacting();
+ for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
}
@@ -59,10 +60,10 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
}
finally
{
- cfs.getDataTracker().unmarkCompacting(sstables);
+ transaction.close();
}
}
- public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables);
+ public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
@@ -80,6 +81,6 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
public String toString()
{
- return "CompactionTask(" + sstables + ")";
+ return "CompactionTask(" + transaction + ")";
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a49a3ea..2292e01 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -24,6 +24,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -31,7 +33,7 @@ import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.OverlapIterator;
import org.apache.cassandra.utils.concurrent.Refs;
-import static org.apache.cassandra.db.DataTracker.buildIntervals;
+import static org.apache.cassandra.db.lifecycle.SSTableIntervalTree.buildIntervals;
/**
* Manage compaction options.