You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/28 16:51:28 UTC

cassandra git commit: Strong Leak Detection

Repository: cassandra
Updated Branches:
  refs/heads/trunk 6422e3476 -> a7f41345a


Strong Leak Detection

When -Dcassandra.debugrefcount=true the globally extant Ref
instances will be walked looking for instances that cannot
ordinarily be reached, and for circular self references.
Errors will be logged for any found.

patch by benedict; reviewed by snazy for CASSANDRA-9423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7f41345
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7f41345
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7f41345

Branch: refs/heads/trunk
Commit: a7f41345ac71826574d09d56ebd402d0ddc3ea3f
Parents: 6422e34
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Jun 26 14:27:28 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Jul 28 15:50:50 2015 +0100

----------------------------------------------------------------------
 .../io/compress/CompressionMetadata.java        |   6 +
 .../cassandra/io/sstable/IndexSummary.java      |   8 +
 .../io/sstable/format/SSTableReader.java        |  22 +-
 .../io/util/CompressedSegmentedFile.java        |   7 +
 .../apache/cassandra/io/util/SafeMemory.java    |   5 +
 .../cassandra/utils/AlwaysPresentFilter.java    |   6 +
 .../org/apache/cassandra/utils/BloomFilter.java |   7 +
 .../apache/cassandra/utils/concurrent/Ref.java  | 204 +++++++++++++++++--
 .../utils/concurrent/SharedCloseable.java       |   1 +
 .../utils/concurrent/SharedCloseableImpl.java   |   5 +
 .../concurrent/WrappedSharedCloseable.java      |  42 +++-
 .../org/apache/cassandra/utils/obs/IBitSet.java |   4 +
 .../cassandra/utils/obs/OffHeapBitSet.java      |   6 +
 .../apache/cassandra/utils/obs/OpenBitSet.java  |   7 +-
 14 files changed, 295 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 2975a3d..ff9ae64 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Transactional;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
  * Holds metadata about compressed file
@@ -158,6 +159,11 @@ public class CompressionMetadata
         return chunkOffsets.size();
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        identities.add(chunkOffsets);
+    }
+
     /**
      * Read offsets of the individual chunks from the given input.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 90c5b0e..371a243 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 
@@ -161,6 +162,13 @@ public class IndexSummary extends WrappedSharedCloseable
         entries.setByteBuffer(buffer, start, keySize);
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        super.addTo(identities);
+        identities.add(offsets);
+        identities.add(entries);
+    }
+
     public long getPosition(int index)
     {
         return entries.getLong(calculateEnd(index) - 8);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index bae0858..e1a9cdc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2072,19 +2072,25 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         this.readMeter = tidy.global.readMeter = readMeter;
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        identities.add(this);
+        identities.add(tidy.globalRef);
+        dfile.addTo(identities);
+        ifile.addTo(identities);
+        bf.addTo(identities);
+        indexSummary.addTo(identities);
+
+    }
+
     /**
-     * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
-     * the globally shared tidy, i.e.
-     *
-     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
+     * One instance per SSTableReader we create.
      *
-     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
-     * one GlobalTidy for one single logical sstable.
+     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example),
+     * but there can only be one GlobalTidy for one single logical sstable.
      *
      * When the InstanceTidier cleansup, it releases its reference to its GlobalTidy; when all InstanceTidiers
      * for that type have run, the GlobalTidy cleans up.
-     *
-     * For ease, we stash a direct reference to our global tidier
      */
     private static final class InstanceTidier implements Tidy
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 2ae4781..95c61c1 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
 {
@@ -123,6 +124,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
         return new CompressedSegmentedFile(this);
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        super.addTo(identities);
+        metadata.addTo(identities);
+    }
+
     public static class Builder extends SegmentedFile.Builder
     {
         protected final CompressedSequentialWriter writer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/io/util/SafeMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemory.java b/src/java/org/apache/cassandra/io/util/SafeMemory.java
index ad11472..e8cd54f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemory.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemory.java
@@ -103,4 +103,9 @@ public class SafeMemory extends Memory implements SharedCloseable
         assert peer != 0 || size == 0 : ref.printDebugInfo();
         super.checkBounds(start, end);
     }
+
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        identities.add(ref);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 44d8f24..b046e84 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.utils.concurrent.Ref;
+
 public class AlwaysPresentFilter implements IFilter
 {
     public boolean isPresent(FilterKey key)
@@ -40,6 +42,10 @@ public class AlwaysPresentFilter implements IFilter
         return accumulate;
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+    }
+
     public long serializedSize() { return 0; }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index e1d9f20..ce6c638 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.utils;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.obs.IBitSet;
 
@@ -151,4 +152,10 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter
     {
         return "BloomFilter[hashCount=" + hashCount + ";oldBfHashOrder=" + oldBfHashOrder + ";capacity=" + bitset.capacity() + ']';
     }
+
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        super.addTo(identities);
+        bitset.addTo(identities);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index a044a24..522caca 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -2,17 +2,27 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemory;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+import static java.util.Collections.emptyList;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
@@ -236,7 +246,7 @@ public final class Ref<T> implements RefCounted<T>
 
     // the object that manages the actual cleaning up; this does not reference the target object
     // so that we can detect when references are lost to the resource itself, and still cleanup afterwards
-    // the Tidy object MUST not contain any references to the object we are managing
+    // the Tidy object MUST NOT contain any references to the object we are managing
     static final class GlobalState
     {
         // we need to retain a reference to each of the PhantomReference instances
@@ -304,34 +314,194 @@ public final class Ref<T> implements RefCounted<T>
         }
     }
 
-    private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<GlobalState, Boolean>());
+    private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>());
     static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
     private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+    private static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(0, new NamedThreadFactory("Strong-Reference-Leak-Detector"));
     static
     {
-        EXEC.execute(new Runnable()
+        EXEC.execute(new ReferenceReaper());
+        if (DEBUG_ENABLED)
+        {
+            STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, TimeUnit.MINUTES);
+            STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new StrongLeakDetector(), 2, 15, TimeUnit.MINUTES);
+        }
+    }
+
+    static final class ReferenceReaper implements Runnable
+    {
+        public void run()
         {
-            public void run()
+            try
             {
+                while (true)
+                {
+                    Object obj = referenceQueue.remove();
+                    if (obj instanceof Ref.State)
+                    {
+                        ((Ref.State) obj).release(true);
+                    }
+                }
+            }
+            catch (InterruptedException e)
+            {
+            }
+            finally
+            {
+                EXEC.execute(this);
+            }
+        }
+    }
+
+    static class Visitor implements Runnable
+    {
+        final Stack<Field> path = new Stack<>();
+        final Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>());
+        GlobalState visiting;
+
+        public void run()
+        {
+            try
+            {
+                for (GlobalState globalState : globallyExtant)
+                {
+                    if (globalState.tidy == null)
+                        continue;
+
+                    // do a graph exploration of the GlobalState, since it should be shallow; if it references itself, we have a problem
+                    path.clear();
+                    visited.clear();
+                    visited.add(globalState);
+                    visiting = globalState;
+                    visit(globalState.tidy);
+                }
+            }
+            catch (Throwable t)
+            {
+                t.printStackTrace();
+            }
+            finally
+            {
+                path.clear();
+                visited.clear();
+            }
+        }
+
+        void visit(final Object object)
+        {
+            for (Field field : getFields(object.getClass()))
+            {
+                path.push(field);
                 try
                 {
-                    while (true)
+                    Object child = field.get(object);
+                    if (child != null && visited.add(child))
+                    {
+                        visit(child);
+                    }
+                    else if (visiting == child)
                     {
-                        Object obj = referenceQueue.remove();
-                        if (obj instanceof Ref.State)
-                        {
-                            ((Ref.State) obj).release(true);
-                        }
+                        logger.error("Strong self-ref loop detected {}", path);
                     }
                 }
-                catch (InterruptedException e)
+                catch (IllegalAccessException e)
                 {
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 5, TimeUnit.MINUTES, "Could not fully check for self-referential leaks", e);
                 }
-                finally
+                catch (StackOverflowError e)
                 {
-                    EXEC.execute(this);
+                    logger.error("Stackoverflow {}", path);
                 }
+                path.pop();
             }
-        });
+        }
+    }
+
+    static final Map<Class<?>, List<Field>> fieldMap = new HashMap<>();
+    static List<Field> getFields(Class<?> clazz)
+    {
+        if (clazz == null || clazz == PhantomReference.class || clazz == Class.class || java.lang.reflect.Member.class.isAssignableFrom(clazz))
+            return emptyList();
+        List<Field> fields = fieldMap.get(clazz);
+        if (fields != null)
+            return fields;
+        fieldMap.put(clazz, fields = new ArrayList<>());
+        for (Field field : clazz.getDeclaredFields())
+        {
+            if (field.getType().isPrimitive() || Modifier.isStatic(field.getModifiers()))
+                continue;
+            field.setAccessible(true);
+            fields.add(field);
+        }
+        fields.addAll(getFields(clazz.getSuperclass()));
+        return fields;
+    }
+
+    public static class IdentityCollection
+    {
+        final Set<Tidy> candidates;
+        public IdentityCollection(Set<Tidy> candidates)
+        {
+            this.candidates = candidates;
+        }
+
+        public void add(Ref<?> ref)
+        {
+            candidates.remove(ref.state.globalState.tidy);
+        }
+        public void add(SelfRefCounted<?> ref)
+        {
+            add(ref.selfRef());
+        }
+        public void add(SharedCloseable ref)
+        {
+            if (ref instanceof SharedCloseableImpl)
+                add((SharedCloseableImpl)ref);
+        }
+        public void add(SharedCloseableImpl ref)
+        {
+            add(ref.ref);
+        }
+        public void add(Memory memory)
+        {
+            if (memory instanceof SafeMemory)
+                ((SafeMemory) memory).addTo(this);
+        }
+    }
+
+    private static class StrongLeakDetector implements Runnable
+    {
+        Set<Tidy> candidates = new HashSet<>();
+
+        public void run()
+        {
+            final Set<Tidy> candidates = Collections.newSetFromMap(new IdentityHashMap<>());
+            for (GlobalState state : globallyExtant)
+                candidates.add(state.tidy);
+            removeExpected(candidates);
+            this.candidates.retainAll(candidates);
+            if (!this.candidates.isEmpty())
+            {
+                List<String> names = new ArrayList<>();
+                for (Tidy tidy : this.candidates)
+                    names.add(tidy.name());
+                logger.warn("Strong reference leak candidates detected: {}", names);
+            }
+            this.candidates = candidates;
+        }
+
+        private void removeExpected(Set<Tidy> candidates)
+        {
+            final Ref.IdentityCollection expected = new Ref.IdentityCollection(candidates);
+            for (Keyspace ks : Keyspace.all())
+            {
+                for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
+                {
+                    View view = cfs.getTracker().getView();
+                    for (SSTableReader reader : Iterables.concat(view.sstables, view.compacting))
+                        reader.addTo(expected);
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
index a3a1863..d643d1d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
@@ -33,4 +33,5 @@ public interface SharedCloseable extends AutoCloseable
     public SharedCloseable sharedCopy();
     public Throwable close(Throwable accumulate);
 
+    public void addTo(Ref.IdentityCollection identities);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
index d85fd54..023df8f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
@@ -49,4 +49,9 @@ public abstract class SharedCloseableImpl implements SharedCloseable
     {
         return ref.ensureReleased(accumulate);
     }
+
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        identities.add(ref);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index 96e226c..0eefae3 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -20,6 +20,11 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.util.Arrays;
 
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * An implementation of SharedCloseable that wraps a normal AutoCloseable,
  * ensuring its close method is only called when all instances of SharedCloseable have been
@@ -35,20 +40,39 @@ public abstract class WrappedSharedCloseable extends SharedCloseableImpl
 
     public WrappedSharedCloseable(final AutoCloseable[] closeable)
     {
-        super(new RefCounted.Tidy()
+        super(new Tidy(closeable));
+        wrapped = closeable;
+    }
+
+    static final class Tidy implements RefCounted.Tidy
+    {
+        final AutoCloseable[] closeable;
+        Tidy(AutoCloseable[] closeable)
         {
-            public void tidy() throws Exception
+            this.closeable = closeable;
+        }
+
+        public void tidy() throws Exception
+        {
+            Throwable fail = null;
+            for (AutoCloseable c : closeable)
             {
-                for (AutoCloseable c : closeable)
+                try
+                {
                     c.close();
+                }
+                catch (Throwable t)
+                {
+                    fail = merge(fail, t);
+                }
             }
+            maybeFail(fail);
+        }
 
-            public String name()
-            {
-                return Arrays.toString(closeable);
-            }
-        });
-        wrapped = closeable;
+        public String name()
+    {
+        return Arrays.toString(closeable);
+    }
     }
 
     protected WrappedSharedCloseable(WrappedSharedCloseable copy)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 3b32fdb..15ff361 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -21,6 +21,8 @@ import java.io.Closeable;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.utils.concurrent.Ref;
+
 public interface IBitSet extends Closeable
 {
     public long capacity();
@@ -55,4 +57,6 @@ public interface IBitSet extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     public long offHeapSize();
+
+    public void addTo(Ref.IdentityCollection identities);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 00c3e67..8593a11 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
  * Off-heap bitset,
@@ -67,6 +68,11 @@ public class OffHeapBitSet implements IBitSet
         return bytes.size();
     }
 
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        identities.add(bytes);
+    }
+
     public boolean get(long index)
     {
         long i = index >> 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f41345/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index dc48e5e..82e6929 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
  * <p>
@@ -115,7 +116,11 @@ public class OpenBitSet implements IBitSet
       return 0;
   }
 
- /**
+    public void addTo(Ref.IdentityCollection identities)
+    {
+    }
+
+    /**
   * Returns the current capacity of this set.  Included for
   * compatibility.  This is *not* equal to {@link #cardinality}
   */