You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/26 23:02:07 UTC

[2/5] git commit: cut down on the number of sstables compared for version and purge checks

cut down on the number of sstables compared for version and purge checks


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45af95ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45af95ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45af95ab

Branch: refs/heads/cassandra-1.1
Commit: 45af95ab30d63829342f26c237b5c2cd186cf5e3
Parents: 18b5564
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Mar 23 12:48:56 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 26 15:53:22 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   34 ++++++------
 src/java/org/apache/cassandra/db/DataTracker.java  |   16 +++---
 .../db/compaction/CompactionController.java        |   39 ++++++++-------
 3 files changed, 46 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 81fa285..4475070 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import javax.management.*;
 
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +66,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.config.CFMetaData.Caching;
@@ -844,23 +842,25 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Uses bloom filters to check if key may be present in any sstable in this
-     * ColumnFamilyStore, minus a set of provided ones.
-     *
-     * Because BFs are checked, negative returns ensure that the key is not
-     * present in the checked SSTables, but positive ones doesn't ensure key
-     * presence.
+     * @param sstables
+     * @return sstables whose key range overlaps with that of the given sstables, not including itself.
+     * (The given sstables may or may not overlap with each other.)
      */
-    public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore)
+    public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables)
     {
-        // we don't need to acquire references here, since the bloom filter is safe to use even post-compaction
-        List<SSTableReader> filteredSSTables = data.getView().intervalTree.search(new Interval(key, key));
-        for (SSTableReader sstable : filteredSSTables)
+        assert !sstables.isEmpty();
+        IntervalTree<SSTableReader> tree = data.getView().intervalTree;
+
+        Set<SSTableReader> results = null;
+        for (SSTableReader sstable : sstables)
         {
-            if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
-                return true;
+            Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new Interval<SSTableReader>(sstable.first, sstable.last)));
+            assert overlaps.contains(sstable);
+            results = results == null ? overlaps : Sets.union(results, overlaps);
         }
-        return false;
+        results = Sets.difference(results, ImmutableSet.copyOf(sstables));
+
+        return results;
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d6b5fe5..71577b2 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -552,6 +552,14 @@ public class DataTracker
         assert found : consumer + " not subscribed";
     }
 
+    public static IntervalTree<SSTableReader> buildIntervalTree(Iterable<SSTableReader> sstables)
+    {
+        List<Interval> intervals = new ArrayList<Interval>(Iterables.size(sstables));
+        for (SSTableReader sstable : sstables)
+            intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
+        return new IntervalTree<SSTableReader>(intervals);
+    }
+
     /**
      * An immutable structure holding the current memtable, the memtables pending
      * flush, the sstables for a column family, and the sstables that are active
@@ -584,14 +592,6 @@ public class DataTracker
             return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
         }
 
-        private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
-        {
-            List<Interval> intervals = new ArrayList<Interval>(sstables.size());
-            for (SSTableReader sstable : sstables)
-                intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
-            return new IntervalTree<SSTableReader>(intervals);
-        }
-
         public View switchMemtable(Memtable newMemtable)
         {
             Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 7db5723..1da6f9c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -21,17 +21,17 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.EchoedRow;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 
 /**
  * Manage compaction options.
@@ -41,8 +41,8 @@ public class CompactionController
     private static Logger logger = LoggerFactory.getLogger(CompactionController.class);
 
     private final ColumnFamilyStore cfs;
-    private final Set<SSTableReader> sstables;
-    private final boolean forceDeserialize;
+    private final boolean deserializeRequired;
+    private final IntervalTree<SSTableReader> overlappingTree;
 
     public final int gcBefore;
     public boolean keyExistenceIsExpensive;
@@ -52,15 +52,16 @@ public class CompactionController
     {
         assert cfs != null;
         this.cfs = cfs;
-        this.sstables = new HashSet<SSTableReader>(sstables);
         this.gcBefore = gcBefore;
         // If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable.
         // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
         // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
         // current 'stop all write during memtable switch' situation).
         this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
-        this.forceDeserialize = forceDeserialize;
-        keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
+        deserializeRequired = forceDeserialize || !allLatestVersion(sstables);
+        Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables);
+        overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
+        keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables));
     }
 
     public String getKeyspace()
@@ -79,19 +80,21 @@ public class CompactionController
      */
     public boolean shouldPurge(DecoratedKey key)
     {
-        return !cfs.isKeyInRemainingSSTables(key, sstables);
+        List<SSTableReader> filteredSSTables = overlappingTree.search(new Interval(key, key));
+        for (SSTableReader sstable : filteredSSTables)
+        {
+            if (sstable.getBloomFilter().isPresent(key.key))
+                return false;
+        }
+        return true;
     }
 
-    public boolean needDeserialize()
+    private static boolean allLatestVersion(Iterable<SSTableReader> sstables)
     {
-        if (forceDeserialize)
-            return true;
-
         for (SSTableReader sstable : sstables)
             if (!sstable.descriptor.isLatestVersion)
-                return true;
-
-        return false;
+                return false;
+        return true;
     }
 
     public void invalidateCachedRow(DecoratedKey key)
@@ -128,7 +131,7 @@ public class CompactionController
 
         // in-memory echoedrow is only enabled if we think checking for the key's existence in the other sstables,
         // is going to be less expensive than simply de/serializing the row again
-        if (rows.size() == 1 && !needDeserialize()
+        if (rows.size() == 1 && !deserializeRequired
             && (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit() || !keyExistenceIsExpensive)
             && !shouldPurge(rows.get(0).getKey()))
         {