You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/19 19:26:48 UTC
[2/3] git commit: Fix resource leak in event of corrupt sstable
Fix resource leak in event of corrupt sstable
patch by benedict; review by yukim for CASSANDRA-7932
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007
Branch: refs/heads/trunk
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 5 +-
.../compaction/AbstractCompactionStrategy.java | 56 ++++-
.../db/compaction/CompactionManager.java | 193 +++++++++---------
.../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
.../compaction/LeveledCompactionStrategy.java | 43 ++--
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../cassandra/utils/CloseableIterator.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * Fix resource leak in event of corrupt sstable
* (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
* Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
* Invalidate prepared statements when their keyspace or table is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
void removeUnreadableSSTables(File directory)
{
View currentView, newView;
- List<SSTableReader> remaining = new ArrayList<>();
+ Set<SSTableReader> remaining = new HashSet<>();
do
{
currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
newView = currentView.replace(currentView.sstables, remaining);
}
while (!view.compareAndSet(currentView, newView));
+ for (SSTableReader sstable : currentView.sstables)
+ if (!remaining.contains(sstable))
+ sstable.releaseReference();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getScanner(range, limiter));
- return scanners;
+ try
+ {
+ for (SSTableReader sstable : sstables)
+ scanners.add(sstable.getScanner(range, limiter));
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ throw t;
+ }
+ return new ScannerList(scanners);
+ }
+
+ public static class ScannerList implements AutoCloseable
+ {
+ public final List<ICompactionScanner> scanners;
+ public ScannerList(List<ICompactionScanner> scanners)
+ {
+ this.scanners = scanners;
+ }
+
+ public void close()
+ {
+ Throwable t = null;
+ for (ICompactionScanner scanner : scanners)
+ {
+ try
+ {
+ scanner.close();
+ }
+ catch (Throwable t2)
+ {
+ if (t == null)
+ t = t2;
+ else
+ t.addSuppressed(t2);
+ }
+ }
+ if (t != null)
+ throw Throwables.propagate(t);
+ }
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+ public ScannerList getScanners(Collection<SSTableReader> toCompact)
{
return getScanners(toCompact, null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- Collection<SSTableReader> sstables;
- String snapshotName = validator.desc.sessionId.toString();
- int gcBefore;
- boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
- if (isSnapshotValidation)
- {
- // If there is a snapshot created for the session then read from there.
- sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
- // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
- // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
- // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
- // 'as good as in the non-snapshot' case)
- gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
- }
- else
+ Collection<SSTableReader> sstables = null;
+ try
{
- // flush first so everyone is validating data that is as similar as possible
- StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
- // instead so they won't be cleaned up if they do get compacted during the validation
- if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
- sstables = cfs.markCurrentSSTablesReferenced();
- else
- sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- if (validator.gcBefore > 0)
- gcBefore = validator.gcBefore;
+ String snapshotName = validator.desc.sessionId.toString();
+ int gcBefore;
+ boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+ if (isSnapshotValidation)
+ {
+ // If there is a snapshot created for the session then read from there.
+ sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+ // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+ // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+ // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+ // 'as good as in the non-snapshot' case)
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+ }
else
- gcBefore = getDefaultGcBefore(cfs);
- }
-
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ {
+ // flush first so everyone is validating data that is as similar as possible
+ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+ // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+ // instead so they won't be cleaned up if they do get compacted during the validation
+ if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+ sstables = cfs.markCurrentSSTablesReferenced();
+ else
+ sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+ if (validator.gcBefore > 0)
+ gcBefore = validator.gcBefore;
+ else
+ gcBefore = getDefaultGcBefore(cfs);
+ }
- long start = System.nanoTime();
- metrics.beginCompaction(ci);
- try
- {
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
}
- validator.complete();
- }
- finally
- {
- iter.close();
- SSTableReader.releaseReferences(sstables);
- if (isSnapshotValidation)
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+ long start = System.nanoTime();
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
{
- cfs.clearSnapshot(snapshotName);
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ metrics.beginCompaction(ci);
+ try
+ {
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ AbstractCompactedRow row = iter.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation)
+ {
+ cfs.clearSnapshot(snapshotName);
+ }
+
+ metrics.finishCompaction(ci);
+ }
}
- metrics.finishCompaction(ci);
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
+ }
}
-
- if (logger.isDebugEnabled())
+ finally
{
- // MT serialize may take time
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
- duration,
- depth,
- numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
- validator.desc);
+ if (sstables != null)
+ SSTableReader.releaseReferences(sstables);
}
}
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
- try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ while(iter.hasNext())
{
- while(iter.hasNext())
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
private static class ValidationCompactionIterable extends CompactionIterable
{
- public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+ public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
{
- super(OperationType.VALIDATION,
- cfs.getCompactionStrategy().getScanners(sstables, range),
- new ValidationCompactionController(cfs, gcBefore));
+ super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- CompactionController controller = getCompactionController(sstables);
- Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
long start = System.nanoTime();
long totalKeysWritten = 0;
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
- // TODO: errors when creating the scanners can result in untidied resources
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- long minRepairedAt = getMinRepairedAt(actuallyCompact);
- // we only need the age of the data that we're actually retaining
- long maxAge = getMaxDataAge(actuallyCompact);
- if (collector != null)
- collector.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
- try
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- if (writer.append(row) != null)
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+ {
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ try
{
- totalKeysWritten++;
- if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ if (!iter.hasNext())
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
+
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ }
+ }
+ }
+
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
- }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
+ finally
+ {
- // don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
- }
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
- finally
- {
- controller.close();
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
- if (collector != null)
- collector.finishCompaction(ci);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
+ }
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
-
- Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return maxSSTableSizeInMB * 1024L * 1024L;
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
}
List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
- for (Integer level : byLevel.keySet())
+ try
{
- // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
- // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
- if (level <= 0)
+ for (Integer level : byLevel.keySet())
{
- // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
- for (SSTableReader sstable : byLevel.get(level))
- scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+ // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+ if (level <= 0)
+ {
+ // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
+ for (SSTableReader sstable : byLevel.get(level))
+ scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ }
+ else
+ {
+ // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+ List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+ if (!intersecting.isEmpty())
+ scanners.add(new LeveledScanner(intersecting, range));
+ }
}
- else
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
{
- // Create a LeveledScanner that only opens one sstable at a time, in sorted order
- List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
- if (!intersecting.isEmpty())
- scanners.add(new LeveledScanner(intersecting, range));
+ t.addSuppressed(t2);
}
+ throw t;
}
- return scanners;
+ return new ScannerList(scanners);
}
// Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
outputHandler.output("Upgrading " + sstable);
SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
{
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
import java.util.Iterator;
// so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
// get LeveledScanner for level 1 sstables
Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
- List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+ List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
assertEquals(1, scanners.size()); // should be one per level
ICompactionScanner scanner = scanners.get(0);
// scan through to the end