You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/19 19:26:47 UTC
[1/3] git commit: Fix resource leak in event of corrupt sstable
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 c5c0585b4 -> 0e8310077
refs/heads/trunk 3e305f809 -> 0956a8a71
Fix resource leak in event of corrupt sstable
patch by benedict; review by yukim for CASSANDRA-7932
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007
Branch: refs/heads/cassandra-2.1
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 5 +-
.../compaction/AbstractCompactionStrategy.java | 56 ++++-
.../db/compaction/CompactionManager.java | 193 +++++++++---------
.../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
.../compaction/LeveledCompactionStrategy.java | 43 ++--
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../cassandra/utils/CloseableIterator.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * Fix resource leak in event of corrupt sstable
* (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
* Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
* Invalidate prepared statements when their keyspace or table is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
void removeUnreadableSSTables(File directory)
{
View currentView, newView;
- List<SSTableReader> remaining = new ArrayList<>();
+ Set<SSTableReader> remaining = new HashSet<>();
do
{
currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
newView = currentView.replace(currentView.sstables, remaining);
}
while (!view.compareAndSet(currentView, newView));
+ for (SSTableReader sstable : currentView.sstables)
+ if (!remaining.contains(sstable))
+ sstable.releaseReference();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getScanner(range, limiter));
- return scanners;
+ try
+ {
+ for (SSTableReader sstable : sstables)
+ scanners.add(sstable.getScanner(range, limiter));
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ throw t;
+ }
+ return new ScannerList(scanners);
+ }
+
+ public static class ScannerList implements AutoCloseable
+ {
+ public final List<ICompactionScanner> scanners;
+ public ScannerList(List<ICompactionScanner> scanners)
+ {
+ this.scanners = scanners;
+ }
+
+ public void close()
+ {
+ Throwable t = null;
+ for (ICompactionScanner scanner : scanners)
+ {
+ try
+ {
+ scanner.close();
+ }
+ catch (Throwable t2)
+ {
+ if (t == null)
+ t = t2;
+ else
+ t.addSuppressed(t2);
+ }
+ }
+ if (t != null)
+ throw Throwables.propagate(t);
+ }
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+ public ScannerList getScanners(Collection<SSTableReader> toCompact)
{
return getScanners(toCompact, null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- Collection<SSTableReader> sstables;
- String snapshotName = validator.desc.sessionId.toString();
- int gcBefore;
- boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
- if (isSnapshotValidation)
- {
- // If there is a snapshot created for the session then read from there.
- sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
- // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
- // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
- // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
- // 'as good as in the non-snapshot' case)
- gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
- }
- else
+ Collection<SSTableReader> sstables = null;
+ try
{
- // flush first so everyone is validating data that is as similar as possible
- StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
- // instead so they won't be cleaned up if they do get compacted during the validation
- if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
- sstables = cfs.markCurrentSSTablesReferenced();
- else
- sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- if (validator.gcBefore > 0)
- gcBefore = validator.gcBefore;
+ String snapshotName = validator.desc.sessionId.toString();
+ int gcBefore;
+ boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+ if (isSnapshotValidation)
+ {
+ // If there is a snapshot created for the session then read from there.
+ sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+ // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+ // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+ // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+ // 'as good as in the non-snapshot' case)
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+ }
else
- gcBefore = getDefaultGcBefore(cfs);
- }
-
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ {
+ // flush first so everyone is validating data that is as similar as possible
+ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+ // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+ // instead so they won't be cleaned up if they do get compacted during the validation
+ if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+ sstables = cfs.markCurrentSSTablesReferenced();
+ else
+ sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+ if (validator.gcBefore > 0)
+ gcBefore = validator.gcBefore;
+ else
+ gcBefore = getDefaultGcBefore(cfs);
+ }
- long start = System.nanoTime();
- metrics.beginCompaction(ci);
- try
- {
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
}
- validator.complete();
- }
- finally
- {
- iter.close();
- SSTableReader.releaseReferences(sstables);
- if (isSnapshotValidation)
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+ long start = System.nanoTime();
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
{
- cfs.clearSnapshot(snapshotName);
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ metrics.beginCompaction(ci);
+ try
+ {
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ AbstractCompactedRow row = iter.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation)
+ {
+ cfs.clearSnapshot(snapshotName);
+ }
+
+ metrics.finishCompaction(ci);
+ }
}
- metrics.finishCompaction(ci);
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
+ }
}
-
- if (logger.isDebugEnabled())
+ finally
{
- // MT serialize may take time
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
- duration,
- depth,
- numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
- validator.desc);
+ if (sstables != null)
+ SSTableReader.releaseReferences(sstables);
}
}
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
- try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ while(iter.hasNext())
{
- while(iter.hasNext())
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
private static class ValidationCompactionIterable extends CompactionIterable
{
- public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+ public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
{
- super(OperationType.VALIDATION,
- cfs.getCompactionStrategy().getScanners(sstables, range),
- new ValidationCompactionController(cfs, gcBefore));
+ super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- CompactionController controller = getCompactionController(sstables);
- Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
long start = System.nanoTime();
long totalKeysWritten = 0;
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
- // TODO: errors when creating the scanners can result in untidied resources
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- long minRepairedAt = getMinRepairedAt(actuallyCompact);
- // we only need the age of the data that we're actually retaining
- long maxAge = getMaxDataAge(actuallyCompact);
- if (collector != null)
- collector.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
- try
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- if (writer.append(row) != null)
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+ {
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ try
{
- totalKeysWritten++;
- if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ if (!iter.hasNext())
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
+
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ }
+ }
+ }
+
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
- }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
+ finally
+ {
- // don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
- }
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
- finally
- {
- controller.close();
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
- if (collector != null)
- collector.finishCompaction(ci);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
+ }
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
-
- Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return maxSSTableSizeInMB * 1024L * 1024L;
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
}
List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
- for (Integer level : byLevel.keySet())
+ try
{
- // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
- // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
- if (level <= 0)
+ for (Integer level : byLevel.keySet())
{
- // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
- for (SSTableReader sstable : byLevel.get(level))
- scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+ // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+ if (level <= 0)
+ {
+ // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
+ for (SSTableReader sstable : byLevel.get(level))
+ scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ }
+ else
+ {
+ // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+ List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+ if (!intersecting.isEmpty())
+ scanners.add(new LeveledScanner(intersecting, range));
+ }
}
- else
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
{
- // Create a LeveledScanner that only opens one sstable at a time, in sorted order
- List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
- if (!intersecting.isEmpty())
- scanners.add(new LeveledScanner(intersecting, range));
+ t.addSuppressed(t2);
}
+ throw t;
}
- return scanners;
+ return new ScannerList(scanners);
}
// Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
outputHandler.output("Upgrading " + sstable);
SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
{
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
import java.util.Iterator;
// so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
// get LeveledScanner for level 1 sstables
Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
- List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+ List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
assertEquals(1, scanners.size()); // should be one per level
ICompactionScanner scanner = scanners.get(0);
// scan through to the end
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0956a8a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0956a8a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0956a8a7
Branch: refs/heads/trunk
Commit: 0956a8a717781c8a748931f04a18a215d7d53869
Parents: 3e305f8 0e83100
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:26:23 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:26:23 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 5 +-
.../compaction/AbstractCompactionStrategy.java | 56 ++++-
.../db/compaction/CompactionManager.java | 199 +++++++++---------
.../cassandra/db/compaction/CompactionTask.java | 204 +++++++++----------
.../compaction/LeveledCompactionStrategy.java | 43 ++--
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../cassandra/utils/CloseableIterator.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
9 files changed, 289 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e00d990,f55e5d2..0af1681
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,5 +1,30 @@@
+3.0
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Support Java source code for user-defined functions (CASSANDRA-7562)
+ * Require arg types to disambiguate UDF drops (CASSANDRA-7812)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Verify that UDF class methods are static (CASSANDRA-7781)
+ * Support pure user-defined functions (CASSANDRA-7395, 7740)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+
+
2.1.1
+ * Fix resource leak in event of corrupt sstable
* (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
* Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
* Invalidate prepared statements when their keyspace or table is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28ab84e,97696a8..6a0e0df
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -19,8 -19,10 +19,9 @@@ package org.apache.cassandra.db.compact
import java.util.*;
+ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 85b7e38,e309cfb..0f8acba
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1031,78 -990,63 +1040,74 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
+ return 0;
+ }
- File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ File destination = cfs.directories.getDirectoryForNewSSTables();
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(anticompactionGroup);
-
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
-
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- while(iter.hasNext())
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
- try (CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
++ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
++ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
++ CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
++ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
++
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
+
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
++ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
++ Iterator<AbstractCompactedRow> iter = ci.iterator();
++ while(iter.hasNext())
+ {
- while(iter.hasNext())
++ AbstractCompactedRow row = iter.next();
++ // if current range from sstable is repaired, save it into the new repaired sstable
++ if (Range.isInRanges(row.key.getToken(), ranges))
{
-- AbstractCompactedRow row = iter.next();
-- // if current range from sstable is repaired, save it into the new repaired sstable
-- if (Range.isInRanges(row.key.getToken(), ranges))
-- {
-- repairedSSTableWriter.append(row);
-- repairedKeyCount++;
-- }
-- // otherwise save into the new 'non-repaired' table
-- else
-- {
-- unRepairedSSTableWriter.append(row);
-- unrepairedKeyCount++;
-- }
++ repairedSSTableWriter.append(row);
++ repairedKeyCount++;
++ }
++ // otherwise save into the new 'non-repaired' table
++ else
++ {
++ unRepairedSSTableWriter.append(row);
++ unrepairedKeyCount++;
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
- unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- anticompactedSSTables.addAll(repairedSSTableWriter.finished());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
- }
- catch (Throwable e)
- {
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
+ // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+ repairedSSTableWriter.finish(false, repairedAt);
+ unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
+ // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount + unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
+ return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4a22d0c,6217348..527f483
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -20,7 -20,9 +20,8 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
+ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@@ -43,8 -47,7 +44,7 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
--import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.UUIDGen;
public class CompactionTask extends AbstractCompactionTask
{
@@@ -144,120 -137,117 +141,117 @@@
long start = System.nanoTime();
long totalKeysWritten = 0;
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
- // TODO: errors when creating the scanners can result in untidied resources
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- long minRepairedAt = getMinRepairedAt(actuallyCompact);
- // we only need the age of the data that we're actually retaining
- long maxAge = getMaxDataAge(actuallyCompact);
- if (collector != null)
- collector.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
- try
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- if (writer.append(row) != null)
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+ {
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ try
{
- totalKeysWritten++;
- if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ if (!iter.hasNext())
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
+
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ }
+ }
+ }
+
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
- }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
+ finally
+ {
- // don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
- }
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
- finally
- {
- controller.close();
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
- if (collector != null)
- collector.finishCompaction(ci);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
+ }
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++ logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
++ taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
-
- Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
[2/3] git commit: Fix resource leak in event of corrupt sstable
Posted by be...@apache.org.
Fix resource leak in event of corrupt sstable
patch by benedict; review by yukim for CASSANDRA-7932
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007
Branch: refs/heads/trunk
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 5 +-
.../compaction/AbstractCompactionStrategy.java | 56 ++++-
.../db/compaction/CompactionManager.java | 193 +++++++++---------
.../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
.../compaction/LeveledCompactionStrategy.java | 43 ++--
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../cassandra/utils/CloseableIterator.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * Fix resource leak in event of corrupt sstable
* (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
* Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
* Invalidate prepared statements when their keyspace or table is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
void removeUnreadableSSTables(File directory)
{
View currentView, newView;
- List<SSTableReader> remaining = new ArrayList<>();
+ Set<SSTableReader> remaining = new HashSet<>();
do
{
currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
newView = currentView.replace(currentView.sstables, remaining);
}
while (!view.compareAndSet(currentView, newView));
+ for (SSTableReader sstable : currentView.sstables)
+ if (!remaining.contains(sstable))
+ sstable.releaseReference();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getScanner(range, limiter));
- return scanners;
+ try
+ {
+ for (SSTableReader sstable : sstables)
+ scanners.add(sstable.getScanner(range, limiter));
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ throw t;
+ }
+ return new ScannerList(scanners);
+ }
+
+ public static class ScannerList implements AutoCloseable
+ {
+ public final List<ICompactionScanner> scanners;
+ public ScannerList(List<ICompactionScanner> scanners)
+ {
+ this.scanners = scanners;
+ }
+
+ public void close()
+ {
+ Throwable t = null;
+ for (ICompactionScanner scanner : scanners)
+ {
+ try
+ {
+ scanner.close();
+ }
+ catch (Throwable t2)
+ {
+ if (t == null)
+ t = t2;
+ else
+ t.addSuppressed(t2);
+ }
+ }
+ if (t != null)
+ throw Throwables.propagate(t);
+ }
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+ public ScannerList getScanners(Collection<SSTableReader> toCompact)
{
return getScanners(toCompact, null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- Collection<SSTableReader> sstables;
- String snapshotName = validator.desc.sessionId.toString();
- int gcBefore;
- boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
- if (isSnapshotValidation)
- {
- // If there is a snapshot created for the session then read from there.
- sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
- // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
- // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
- // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
- // 'as good as in the non-snapshot' case)
- gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
- }
- else
+ Collection<SSTableReader> sstables = null;
+ try
{
- // flush first so everyone is validating data that is as similar as possible
- StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
- // instead so they won't be cleaned up if they do get compacted during the validation
- if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
- sstables = cfs.markCurrentSSTablesReferenced();
- else
- sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- if (validator.gcBefore > 0)
- gcBefore = validator.gcBefore;
+ String snapshotName = validator.desc.sessionId.toString();
+ int gcBefore;
+ boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+ if (isSnapshotValidation)
+ {
+ // If there is a snapshot created for the session then read from there.
+ sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+ // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+ // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+ // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+ // 'as good as in the non-snapshot' case)
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+ }
else
- gcBefore = getDefaultGcBefore(cfs);
- }
-
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ {
+ // flush first so everyone is validating data that is as similar as possible
+ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+ // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+ // instead so they won't be cleaned up if they do get compacted during the validation
+ if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+ sstables = cfs.markCurrentSSTablesReferenced();
+ else
+ sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
- CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+ if (validator.gcBefore > 0)
+ gcBefore = validator.gcBefore;
+ else
+ gcBefore = getDefaultGcBefore(cfs);
+ }
- long start = System.nanoTime();
- metrics.beginCompaction(ci);
- try
- {
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
}
- validator.complete();
- }
- finally
- {
- iter.close();
- SSTableReader.releaseReferences(sstables);
- if (isSnapshotValidation)
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+ long start = System.nanoTime();
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
{
- cfs.clearSnapshot(snapshotName);
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ metrics.beginCompaction(ci);
+ try
+ {
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ AbstractCompactedRow row = iter.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation)
+ {
+ cfs.clearSnapshot(snapshotName);
+ }
+
+ metrics.finishCompaction(ci);
+ }
}
- metrics.finishCompaction(ci);
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
+ }
}
-
- if (logger.isDebugEnabled())
+ finally
{
- // MT serialize may take time
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
- duration,
- depth,
- numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
- validator.desc);
+ if (sstables != null)
+ SSTableReader.releaseReferences(sstables);
}
}
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
- try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ while(iter.hasNext())
{
- while(iter.hasNext())
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
private static class ValidationCompactionIterable extends CompactionIterable
{
- public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+ public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
{
- super(OperationType.VALIDATION,
- cfs.getCompactionStrategy().getScanners(sstables, range),
- new ValidationCompactionController(cfs, gcBefore));
+ super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- CompactionController controller = getCompactionController(sstables);
- Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
long start = System.nanoTime();
long totalKeysWritten = 0;
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
- // TODO: errors when creating the scanners can result in untidied resources
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- long minRepairedAt = getMinRepairedAt(actuallyCompact);
- // we only need the age of the data that we're actually retaining
- long maxAge = getMaxDataAge(actuallyCompact);
- if (collector != null)
- collector.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
- try
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- if (writer.append(row) != null)
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+ {
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ try
{
- totalKeysWritten++;
- if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ if (!iter.hasNext())
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
+
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+ }
+ }
+ }
+
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
- }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
+ finally
+ {
- // don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
- }
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
- finally
- {
- controller.close();
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
- if (collector != null)
- collector.finishCompaction(ci);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
+ }
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
-
- Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return maxSSTableSizeInMB * 1024L * 1024L;
}
- public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
}
List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
- for (Integer level : byLevel.keySet())
+ try
{
- // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
- // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
- if (level <= 0)
+ for (Integer level : byLevel.keySet())
{
- // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
- for (SSTableReader sstable : byLevel.get(level))
- scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+ // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+ if (level <= 0)
+ {
+ // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
+ for (SSTableReader sstable : byLevel.get(level))
+ scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+ }
+ else
+ {
+ // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+ List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+ if (!intersecting.isEmpty())
+ scanners.add(new LeveledScanner(intersecting, range));
+ }
}
- else
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ new ScannerList(scanners).close();
+ }
+ catch (Throwable t2)
{
- // Create a LeveledScanner that only opens one sstable at a time, in sorted order
- List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
- if (!intersecting.isEmpty())
- scanners.add(new LeveledScanner(intersecting, range));
+ t.addSuppressed(t2);
}
+ throw t;
}
- return scanners;
+ return new ScannerList(scanners);
}
// Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
outputHandler.output("Upgrading " + sstable);
SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
{
+ Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
import java.util.Iterator;
// so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
// get LeveledScanner for level 1 sstables
Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
- List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+ List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
assertEquals(1, scanners.size()); // should be one per level
ICompactionScanner scanner = scanners.get(0);
// scan through to the end