You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/01/07 21:12:48 UTC
cassandra git commit: Remove ref counting in SSTableScanner,
fix CompactionTask ordering
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 ddca610c9 -> bdbb071f4
Remove ref counting in SSTableScanner, fix CompactionTask ordering
Patch by jmckenzie; reviewed by belliottsmith as a follow-up for CASSANDRA-8399
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdbb071f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdbb071f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdbb071f
Branch: refs/heads/cassandra-2.1
Commit: bdbb071f4f87131d6996aac52f2b75a5833d5238
Parents: ddca610
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 13:05:31 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:05:40 2015 -0600
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
.../cassandra/io/sstable/SSTableScanner.java | 8 +-
2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4885bc8..d215b4c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -140,7 +140,6 @@ public class CompactionTask extends AbstractCompactionTask
try (CompactionController controller = getCompactionController(sstables);)
{
-
Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
@@ -149,11 +148,16 @@ public class CompactionTask extends AbstractCompactionTask
long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ List<SSTableReader> newSStables;
+ AbstractCompactionIterable ci;
+
+ // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+ // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+ // See CASSANDRA-8019 and CASSANDRA-8399
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ ci = new CompactionIterable(compactionType, scanners.scanners, controller);
Iterator<AbstractCompactedRow> iter = ci.iterator();
- List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -215,44 +219,44 @@ public class CompactionTask extends AbstractCompactionTask
if (collector != null)
collector.finishCompaction(ci);
}
+ }
- Collection<SSTableReader> oldSStables = this.sstables;
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
+
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5499195..dc065af 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -52,18 +52,15 @@ public class SSTableScanner implements ISSTableScanner
protected Iterator<OnDiskAtomIterator> iterator;
- // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip
public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
{
- return sstable.acquireReference()
- ? new SSTableScanner(sstable, dataRange, limiter)
- : new SSTableScanner.EmptySSTableScanner(sstable.getFilename());
+ return new SSTableScanner(sstable, dataRange, limiter);
}
public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
- if (positions.isEmpty() || !sstable.acquireReference())
+ if (positions.isEmpty())
return new EmptySSTableScanner(sstable.getFilename());
return new SSTableScanner(sstable, tokenRanges, limiter);
@@ -173,7 +170,6 @@ public class SSTableScanner implements ISSTableScanner
public void close() throws IOException
{
FileUtils.close(dfile, ifile);
- sstable.releaseReference();
}
public long getLengthInBytes()