You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/01/07 21:12:48 UTC

cassandra git commit: Remove ref counting in SSTableScanner, fix CompactionTask ordering

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 ddca610c9 -> bdbb071f4


Remove ref counting in SSTableScanner, fix CompactionTask ordering

Patch by jmckenzie; reviewed by belliottsmith as a follow-up for CASSANDRA-8399


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdbb071f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdbb071f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdbb071f

Branch: refs/heads/cassandra-2.1
Commit: bdbb071f4f87131d6996aac52f2b75a5833d5238
Parents: ddca610
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 13:05:31 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:05:40 2015 -0600

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
 .../cassandra/io/sstable/SSTableScanner.java    |  8 +-
 2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4885bc8..d215b4c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -140,7 +140,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         try (CompactionController controller = getCompactionController(sstables);)
         {
-
             Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
 
             long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
@@ -149,11 +148,16 @@ public class CompactionTask extends AbstractCompactionTask
             long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
+            List<SSTableReader> newSStables;
+            AbstractCompactionIterable ci;
+
+            // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+            // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+            // See CASSANDRA-8019 and CASSANDRA-8399
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
             {
-                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                ci = new CompactionIterable(compactionType, scanners.scanners, controller);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
-                List<SSTableReader> newSStables;
                 // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                 // replace the old entries.  Track entries to preheat here until then.
                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -215,44 +219,44 @@ public class CompactionTask extends AbstractCompactionTask
                     if (collector != null)
                         collector.finishCompaction(ci);
                 }
+            }
 
-                Collection<SSTableReader> oldSStables = this.sstables;
-                if (!offline)
-                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
-                // log a bunch of statistics about the result and save to system table compaction_history
-                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                long startsize = SSTableReader.getTotalBytes(oldSStables);
-                long endsize = SSTableReader.getTotalBytes(newSStables);
-                double ratio = (double) endsize / (double) startsize;
-
-                StringBuilder newSSTableNames = new StringBuilder();
-                for (SSTableReader reader : newSStables)
-                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-                long totalSourceRows = 0;
-                long[] counts = ci.getMergedRowCounts();
-                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-                Map<Integer, Long> mergedRows = new HashMap<>();
-                for (int i = 0; i < counts.length; i++)
-                {
-                    long count = counts[i];
-                    if (count == 0)
-                        continue;
-
-                    int rows = i + 1;
-                    totalSourceRows += rows * count;
-                    mergeSummary.append(String.format("%d:%d, ", rows, count));
-                    mergedRows.put(rows, count);
-                }
-
-                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+            Collection<SSTableReader> oldSStables = this.sstables;
+            if (!offline)
+                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+            // log a bunch of statistics about the result and save to system table compaction_history
+            long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            long startsize = SSTableReader.getTotalBytes(oldSStables);
+            long endsize = SSTableReader.getTotalBytes(newSStables);
+            double ratio = (double) endsize / (double) startsize;
+
+            StringBuilder newSSTableNames = new StringBuilder();
+            for (SSTableReader reader : newSStables)
+                newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+            double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+            long totalSourceRows = 0;
+            long[] counts = ci.getMergedRowCounts();
+            StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+            Map<Integer, Long> mergedRows = new HashMap<>();
+            for (int i = 0; i < counts.length; i++)
+            {
+                long count = counts[i];
+                if (count == 0)
+                    continue;
+
+                int rows = i + 1;
+                totalSourceRows += rows * count;
+                mergeSummary.append(String.format("%d:%d, ", rows, count));
+                mergedRows.put(rows, count);
             }
+
+            SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+            logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                      oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+            logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5499195..dc065af 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -52,18 +52,15 @@ public class SSTableScanner implements ISSTableScanner
 
     protected Iterator<OnDiskAtomIterator> iterator;
 
-    // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
     public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
-        return sstable.acquireReference()
-                ? new SSTableScanner(sstable, dataRange, limiter)
-                : new SSTableScanner.EmptySSTableScanner(sstable.getFilename());
+        return new SSTableScanner(sstable, dataRange, limiter);
     }
     public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
         List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
-        if (positions.isEmpty() || !sstable.acquireReference())
+        if (positions.isEmpty())
             return new EmptySSTableScanner(sstable.getFilename());
 
         return new SSTableScanner(sstable, tokenRanges, limiter);
@@ -173,7 +170,6 @@ public class SSTableScanner implements ISSTableScanner
     public void close() throws IOException
     {
         FileUtils.close(dfile, ifile);
-        sstable.releaseReference();
     }
 
     public long getLengthInBytes()