You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/26 23:02:07 UTC
[2/5] git commit: cut down on the number of sstables compared for
version and purge checks
cut down on the number of sstables compared for version and purge checks
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45af95ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45af95ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45af95ab
Branch: refs/heads/cassandra-1.1
Commit: 45af95ab30d63829342f26c237b5c2cd186cf5e3
Parents: 18b5564
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Mar 23 12:48:56 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 26 15:53:22 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++------
src/java/org/apache/cassandra/db/DataTracker.java | 16 +++---
.../db/compaction/CompactionController.java | 39 ++++++++-------
3 files changed, 46 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 81fa285..4475070 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import javax.management.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +66,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.apache.cassandra.utils.IntervalTree.IntervalTree;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static org.apache.cassandra.config.CFMetaData.Caching;
@@ -844,23 +842,25 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Uses bloom filters to check if key may be present in any sstable in this
- * ColumnFamilyStore, minus a set of provided ones.
- *
- * Because BFs are checked, negative returns ensure that the key is not
- * present in the checked SSTables, but positive ones doesn't ensure key
- * presence.
+ * @param sstables
+ * @return sstables whose key range overlaps with that of the given sstables, not including itself.
+ * (The given sstables may or may not overlap with each other.)
*/
- public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore)
+ public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables)
{
- // we don't need to acquire references here, since the bloom filter is safe to use even post-compaction
- List<SSTableReader> filteredSSTables = data.getView().intervalTree.search(new Interval(key, key));
- for (SSTableReader sstable : filteredSSTables)
+ assert !sstables.isEmpty();
+ IntervalTree<SSTableReader> tree = data.getView().intervalTree;
+
+ Set<SSTableReader> results = null;
+ for (SSTableReader sstable : sstables)
{
- if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
- return true;
+ Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new Interval<SSTableReader>(sstable.first, sstable.last)));
+ assert overlaps.contains(sstable);
+ results = results == null ? overlaps : Sets.union(results, overlaps);
}
- return false;
+ results = Sets.difference(results, ImmutableSet.copyOf(sstables));
+
+ return results;
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d6b5fe5..71577b2 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -552,6 +552,14 @@ public class DataTracker
assert found : consumer + " not subscribed";
}
+ public static IntervalTree<SSTableReader> buildIntervalTree(Iterable<SSTableReader> sstables)
+ {
+ List<Interval> intervals = new ArrayList<Interval>(Iterables.size(sstables));
+ for (SSTableReader sstable : sstables)
+ intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
+ return new IntervalTree<SSTableReader>(intervals);
+ }
+
/**
* An immutable structure holding the current memtable, the memtables pending
* flush, the sstables for a column family, and the sstables that are active
@@ -584,14 +592,6 @@ public class DataTracker
return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
}
- private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
- {
- List<Interval> intervals = new ArrayList<Interval>(sstables.size());
- for (SSTableReader sstable : sstables)
- intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
- return new IntervalTree<SSTableReader>(intervals);
- }
-
public View switchMemtable(Memtable newMemtable)
{
Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 7db5723..1da6f9c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -21,17 +21,17 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.EchoedRow;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.apache.cassandra.utils.IntervalTree.IntervalTree;
/**
* Manage compaction options.
@@ -41,8 +41,8 @@ public class CompactionController
private static Logger logger = LoggerFactory.getLogger(CompactionController.class);
private final ColumnFamilyStore cfs;
- private final Set<SSTableReader> sstables;
- private final boolean forceDeserialize;
+ private final boolean deserializeRequired;
+ private final IntervalTree<SSTableReader> overlappingTree;
public final int gcBefore;
public boolean keyExistenceIsExpensive;
@@ -52,15 +52,16 @@ public class CompactionController
{
assert cfs != null;
this.cfs = cfs;
- this.sstables = new HashSet<SSTableReader>(sstables);
this.gcBefore = gcBefore;
// If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable.
// For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
// add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
// current 'stop all write during memtable switch' situation).
this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
- this.forceDeserialize = forceDeserialize;
- keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
+ deserializeRequired = forceDeserialize || !allLatestVersion(sstables);
+ Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables);
+ overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
+ keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables));
}
public String getKeyspace()
@@ -79,19 +80,21 @@ public class CompactionController
*/
public boolean shouldPurge(DecoratedKey key)
{
- return !cfs.isKeyInRemainingSSTables(key, sstables);
+ List<SSTableReader> filteredSSTables = overlappingTree.search(new Interval(key, key));
+ for (SSTableReader sstable : filteredSSTables)
+ {
+ if (sstable.getBloomFilter().isPresent(key.key))
+ return false;
+ }
+ return true;
}
- public boolean needDeserialize()
+ private static boolean allLatestVersion(Iterable<SSTableReader> sstables)
{
- if (forceDeserialize)
- return true;
-
for (SSTableReader sstable : sstables)
if (!sstable.descriptor.isLatestVersion)
- return true;
-
- return false;
+ return false;
+ return true;
}
public void invalidateCachedRow(DecoratedKey key)
@@ -128,7 +131,7 @@ public class CompactionController
// in-memory echoedrow is only enabled if we think checking for the key's existence in the other sstables,
// is going to be less expensive than simply de/serializing the row again
- if (rows.size() == 1 && !needDeserialize()
+ if (rows.size() == 1 && !deserializeRequired
&& (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit() || !keyExistenceIsExpensive)
&& !shouldPurge(rows.get(0).getKey()))
{