You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/31 17:23:30 UTC

git commit: off-heap bloom filters for row keys patch by vijay; reviewed by jbellis for CASSANDRA-4865

Updated Branches:
  refs/heads/trunk c34ecbf68 -> dc37dea74


off-heap bloom filters for row keys
patch by vijay; reviewed by jbellis for CASSANDRA-4865


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc37dea7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc37dea7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc37dea7

Branch: refs/heads/trunk
Commit: dc37dea745fe89d70819d649c823d9bfcb0d7577
Parents: c34ecbf
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 31 10:54:47 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 31 11:22:47 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/ColumnIndex.java  |    2 +-
 .../org/apache/cassandra/db/RowIndexEntry.java     |    2 +-
 .../apache/cassandra/io/sstable/IndexHelper.java   |    2 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |    4 +-
 .../apache/cassandra/io/sstable/SSTableWriter.java |    4 +-
 src/java/org/apache/cassandra/io/util/Memory.java  |   20 ++
 .../org/apache/cassandra/utils/BloomFilter.java    |   25 +--
 .../cassandra/utils/BloomFilterSerializer.java     |   51 +----
 src/java/org/apache/cassandra/utils/Filter.java    |    3 +-
 .../org/apache/cassandra/utils/FilterFactory.java  |   36 ++--
 .../apache/cassandra/utils/LegacyBloomFilter.java  |    6 +
 .../apache/cassandra/utils/Murmur2BloomFilter.java |   16 +-
 .../apache/cassandra/utils/Murmur3BloomFilter.java |   16 +-
 .../org/apache/cassandra/utils/obs/IBitSet.java    |   52 +++++
 .../apache/cassandra/utils/obs/OffHeapBitSet.java  |  160 +++++++++++++++
 .../org/apache/cassandra/utils/obs/OpenBitSet.java |   60 +++++-
 .../org/apache/cassandra/utils/LongBitSetTest.java |  133 ++++++++++++
 .../cassandra/utils/LongBloomFilterTest.java       |    6 +-
 .../org/apache/cassandra/utils/BitSetTest.java     |  148 +++++++++++++
 .../apache/cassandra/utils/BloomFilterTest.java    |    9 +-
 .../cassandra/utils/LegacyBloomFilterTest.java     |    2 +-
 .../apache/cassandra/utils/SerializationsTest.java |   14 +-
 23 files changed, 657 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11aaea1..7f4a728 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * off-heap bloom filters for row keys (CASSANDRA_4865)
  * add extension point for sstable components (CASSANDRA-4049)
  * improve tracing output (CASSANDRA-4852, 4862)
  * make TRACE verb droppable (CASSANDRA-4672)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 35ee899..946c4f4 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -36,7 +36,7 @@ public class ColumnIndex
 
     private ColumnIndex(int estimatedColumnCount)
     {
-        this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4));
+        this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4, false));
     }
 
     private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex, Filter bloomFilter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index b7660e5..a3701f8 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -119,7 +119,7 @@ public class RowIndexEntry
                     List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries);
                     for (int i = 0; i < entries; i++)
                         columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis));
-                    Filter bf = FilterFactory.deserialize(dis, version.filterType);
+                    Filter bf = FilterFactory.deserialize(dis, version.filterType, false);
                     return new IndexedEntry(position, delInfo, columnsIndex, bf);
                 }
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 29e076a..a87ecf7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -130,7 +130,7 @@ public class IndexHelper
         ByteBuffer bytes = file.readBytes(size);
 
         DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes));
-        return FilterFactory.deserialize(stream, type);
+        return FilterFactory.deserialize(stream, type, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7957134..812a475 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -330,7 +330,7 @@ public class SSTableReader extends SSTable
         try
         {
             stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
-            bf = FilterFactory.deserialize(stream, descriptor.version.filterType);
+            bf = FilterFactory.deserialize(stream, descriptor.version.filterType, true);
         }
         finally
         {
@@ -899,6 +899,8 @@ public class SSTableReader extends SSTable
             dfile.cleanup();
 
             deletingTask.schedule();
+            // close the BF so it can be opened later.
+            FileUtils.closeQuietly(bf);
         }
         assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index c17de4c..2627a77 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -445,8 +445,8 @@ public class SSTableWriter extends SSTable
                 logger.error("Bloom filter FP chance of zero isn't supposed to happen");
                 fpChance = null;
             }
-            bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15)
-                                  : FilterFactory.getFilter(keyCount, fpChance);
+            bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15, true)
+                                  : FilterFactory.getFilter(keyCount, fpChance, true);
         }
 
         public void append(DecoratedKey key, RowIndexEntry indexEntry)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index faef564..25f5caf 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -66,6 +66,13 @@ public class Memory
         unsafe.putByte(peer + offset, b);
     }
 
+    public void setMemory(long offset, long bytes, byte b)
+    {
+        // check if the last element will fit into the memory
+        checkPosition(offset + bytes - 1);
+        unsafe.setMemory(peer + offset, bytes, b);
+    }
+
     /**
      * Transfers count bytes from buffer to Memory
      *
@@ -139,5 +146,18 @@ public class Memory
     {
         return size;
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (!(o instanceof Memory))
+            return false;
+        Memory b = (Memory) o;
+        if (peer == b.peer && size == b.size)
+            return true;
+        return false;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 3ca62e5..469763a 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -17,23 +17,16 @@
  */
 package org.apache.cassandra.utils;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
 
 public abstract class BloomFilter extends Filter
 {
-    private static final int EXCESS = 20;
+    public final IBitSet bitset;
 
-    public final OpenBitSet bitset;
-
-    BloomFilter(int hashes, long numElements, int bucketsPer)
-    {
-        hashCount = hashes;
-        bitset = new OpenBitSet(numElements * bucketsPer + EXCESS);
-    }
-
-    BloomFilter(int hashes, OpenBitSet bitset)
+    BloomFilter(int hashes, IBitSet bitset)
     {
         this.hashCount = hashes;
         this.bitset = bitset;
@@ -41,7 +34,7 @@ public abstract class BloomFilter extends Filter
 
     private long[] getHashBuckets(ByteBuffer key)
     {
-        return getHashBuckets(key, hashCount, bitset.size());
+        return getHashBuckets(key, hashCount, bitset.capacity());
     }
 
     protected abstract long[] hash(ByteBuffer b, int position, int remaining, long seed);
@@ -84,6 +77,12 @@ public abstract class BloomFilter extends Filter
 
     public void clear()
     {
-        bitset.clear(0, bitset.size());
+        bitset.clear();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        bitset.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 68997c9..6b8b355 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -23,46 +23,31 @@ import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
 import org.apache.cassandra.utils.obs.OpenBitSet;
 
 abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
 {
     public void serialize(BloomFilter bf, DataOutput dos) throws IOException
     {
-        int bitLength = bf.bitset.getNumWords();
-        int pageSize = bf.bitset.getPageSize();
-        int pageCount = bf.bitset.getPageCount();
-
         dos.writeInt(bf.getHashCount());
-        dos.writeInt(bitLength);
-
-        for (int p = 0; p < pageCount; p++)
-        {
-            long[] bits = bf.bitset.getPage(p);
-            for (int i = 0; i < pageSize && bitLength-- > 0; i++)
-                dos.writeLong(bits[i]);
-        }
+        bf.bitset.serialize(dos);
     }
 
     public BloomFilter deserialize(DataInput dis) throws IOException
     {
-        int hashes = dis.readInt();
-        long bitLength = dis.readInt();
-        OpenBitSet bs = new OpenBitSet(bitLength << 6);
-        int pageSize = bs.getPageSize();
-        int pageCount = bs.getPageCount();
-
-        for (int p = 0; p < pageCount; p++)
-        {
-            long[] bits = bs.getPage(p);
-            for (int i = 0; i < pageSize && bitLength-- > 0; i++)
-                bits[i] = dis.readLong();
-        }
+        return deserialize(dis, false);
+    }
 
+    public BloomFilter deserialize(DataInput dis, boolean offheap) throws IOException
+    {
+        int hashes = dis.readInt();
+        IBitSet bs = offheap ? OffHeapBitSet.deserialize(dis) : OpenBitSet.deserialize(dis);
         return createFilter(hashes, bs);
     }
 
-    protected abstract BloomFilter createFilter(int hashes, OpenBitSet bs);
+    protected abstract BloomFilter createFilter(int hashes, IBitSet bs);
 
     /**
      * Calculates a serialized size of the given Bloom Filter
@@ -74,20 +59,8 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
      */
     public long serializedSize(BloomFilter bf, TypeSizes typeSizes)
     {
-        int bitLength = bf.bitset.getNumWords();
-        int pageSize = bf.bitset.getPageSize();
-        int pageCount = bf.bitset.getPageCount();
-
-        int size = 0;
-        size += typeSizes.sizeof(bf.getHashCount()); // hash count
-        size += typeSizes.sizeof(bitLength); // length
-
-        for (int p = 0; p < pageCount; p++)
-        {
-            long[] bits = bf.bitset.getPage(p);
-            for (int i = 0; i < pageSize && bitLength-- > 0; i++)
-                size += typeSizes.sizeof(bits[i]); // bucket
-        }
+        int size = typeSizes.sizeof(bf.getHashCount()); // hash count
+        size += bf.bitset.serializedSize(typeSizes);
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Filter.java b/src/java/org/apache/cassandra/utils/Filter.java
index f7ce1f3..ea98401 100644
--- a/src/java/org/apache/cassandra/utils/Filter.java
+++ b/src/java/org/apache/cassandra/utils/Filter.java
@@ -17,9 +17,10 @@
  */
 package org.apache.cassandra.utils;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 
-public abstract class Filter
+public abstract class Filter implements Closeable
 {
     int hashCount;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 2c9bcf4..3eae519 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -22,6 +22,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ public class FilterFactory
 {
     private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class);
     private static final TypeSizes TYPE_SIZES = TypeSizes.NATIVE;
+    private static final long BITSET_EXCESS = 20;
 
     public enum Type
     {
@@ -57,16 +61,16 @@ public class FilterFactory
         }
     }
 
-    public static Filter deserialize(DataInput input, Type type) throws IOException
+    public static Filter deserialize(DataInput input, Type type, boolean offheap) throws IOException
     {
         switch (type)
         {
             case SHA:
                 return LegacyBloomFilter.serializer.deserialize(input);
             case MURMUR2:
-                return Murmur2BloomFilter.serializer.deserialize(input);
+                return Murmur2BloomFilter.serializer.deserialize(input, offheap);
             default:
-                return Murmur3BloomFilter.serializer.deserialize(input);
+                return Murmur3BloomFilter.serializer.deserialize(input, offheap);
         }
     }
 
@@ -92,13 +96,13 @@ public class FilterFactory
      * @return A BloomFilter with the lowest practical false positive
      *         probability for the given number of elements.
      */
-    public static Filter getFilter(long numElements, int targetBucketsPerElem)
+    public static Filter getFilter(long numElements, int targetBucketsPerElem, boolean offheap)
     {
-        return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3);
+        return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3, offheap);
     }
 
     // helper method for test.
-    static Filter getFilter(long numElements, int targetBucketsPerElem, Type type)
+    static Filter getFilter(long numElements, int targetBucketsPerElem, Type type, boolean offheap)
     {
         int maxBucketsPerElement = Math.max(1, BloomCalculations.maxBucketsPerElement(numElements));
         int bucketsPerElement = Math.min(targetBucketsPerElem, maxBucketsPerElement);
@@ -107,7 +111,7 @@ public class FilterFactory
             logger.warn(String.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).", numElements, bucketsPerElement, targetBucketsPerElem));
         }
         BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement);
-        return createFilter(spec.K, numElements, spec.bucketsPerElement, type);
+        return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap);
     }
 
     /**
@@ -117,33 +121,35 @@ public class FilterFactory
      *         Asserts that the given probability can be satisfied using this
      *         filter.
      */
-    public static Filter getFilter(long numElements, double maxFalsePosProbability)
+    public static Filter getFilter(long numElements, double maxFalsePosProbability, boolean offheap)
     {
-        return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3);
+        return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3, offheap);
     }
 
     // helper method for test.
-    static Filter getFilter(long numElements, double maxFalsePosProbability, Type type)
+    static Filter getFilter(long numElements, double maxFalsePosProbability, Type type, boolean offheap)
     {
         assert maxFalsePosProbability <= 1.0 : "Invalid probability";
         int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
         BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability);
-        return createFilter(spec.K, numElements, spec.bucketsPerElement, type);
+        return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap);
     }
 
-    private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type)
+    private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type, boolean offheap)
     {
+        long numBits = (numElements * bucketsPer) + BITSET_EXCESS;
+        IBitSet bitset = offheap ? new OffHeapBitSet(numBits) : new OpenBitSet(numBits);
         switch (type)
         {
             case MURMUR2:
-              return new Murmur2BloomFilter(hash, numElements, bucketsPer);
+              return new Murmur2BloomFilter(hash, bitset);
             default:
-              return new Murmur3BloomFilter(hash, numElements, bucketsPer);
+              return new Murmur3BloomFilter(hash, bitset);
         }
     }
 
     public static BloomFilter emptyFilter()
     {
-        return new Murmur3BloomFilter(0, 0, 0);
+        return new Murmur3BloomFilter(0, new OpenBitSet(BITSET_EXCESS));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
index 6f7269e..a50e2c8 100644
--- a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.utils;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
@@ -160,4 +161,9 @@ public class LegacyBloomFilter extends Filter
     public BitSet getBitSet(){
       return filter;
     }
+
+    public void close() throws IOException
+    {
+        // Do nothing for this
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
index df5a160..1c20dd1 100644
--- a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
@@ -19,19 +19,13 @@ package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
 
 public class Murmur2BloomFilter extends BloomFilter
 {
-    public static final ISerializer<BloomFilter> serializer = new Murmur2BloomFilterSerializer();
+    public static final Murmur2BloomFilterSerializer serializer = new Murmur2BloomFilterSerializer();
 
-    Murmur2BloomFilter(int hashes, long numElements, int bucketsPer)
-    {
-        super(hashes, numElements, bucketsPer);
-    }
-
-    private Murmur2BloomFilter(int hashes, OpenBitSet bs)
+    public Murmur2BloomFilter(int hashes, IBitSet bs)
     {
         super(hashes, bs);
     }
@@ -43,9 +37,9 @@ public class Murmur2BloomFilter extends BloomFilter
         return (new long[] { hash1, hash2 });
     }
 
-    private static class Murmur2BloomFilterSerializer extends BloomFilterSerializer
+    public static class Murmur2BloomFilterSerializer extends BloomFilterSerializer
     {
-        protected BloomFilter createFilter(int hashes, OpenBitSet bs)
+        protected BloomFilter createFilter(int hashes, IBitSet bs)
         {
             return new Murmur2BloomFilter(hashes, bs);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
index 304842a..ebd506c 100644
--- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
@@ -19,19 +19,13 @@ package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
 
 public class Murmur3BloomFilter extends BloomFilter
 {
-    public static final ISerializer<BloomFilter> serializer = new Murmur3BloomFilterSerializer();
+    public static final Murmur3BloomFilterSerializer serializer = new Murmur3BloomFilterSerializer();
 
-    Murmur3BloomFilter(int hashes, long numElements, int bucketsPer)
-    {
-        super(hashes, numElements, bucketsPer);
-    }
-
-    private Murmur3BloomFilter(int hashes, OpenBitSet bs)
+    public Murmur3BloomFilter(int hashes, IBitSet bs)
     {
         super(hashes, bs);
     }
@@ -41,9 +35,9 @@ public class Murmur3BloomFilter extends BloomFilter
         return MurmurHash.hash3_x64_128(b, b.position(), b.remaining(), seed);
     }
 
-    private static class Murmur3BloomFilterSerializer extends BloomFilterSerializer
+    public static class Murmur3BloomFilterSerializer extends BloomFilterSerializer
     {
-        protected BloomFilter createFilter(int hashes, OpenBitSet bs)
+        protected BloomFilter createFilter(int hashes, IBitSet bs)
         {
             return new Murmur3BloomFilter(hashes, bs);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
new file mode 100644
index 0000000..c5a2fb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.obs;
+
+import java.io.Closeable;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+
+public interface IBitSet extends Closeable
+{
+    public long capacity();
+
+    /**
+     * Returns true or false for the specified bit index. The index should be
+     * less than the capacity.
+     */
+    public boolean get(long index);
+
+    /**
+     * Sets the bit at the specified index. The index should be less than the
+     * capacity.
+     */
+    public void set(long index);
+
+    /**
+     * clears the bit. The index should be less than the capacity.
+     */
+    public void clear(long index);
+
+    public void serialize(DataOutput dos) throws IOException;
+
+    public long serializedSize(TypeSizes type);
+
+    public void clear();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
new file mode 100644
index 0000000..1733a81
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.obs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.cache.RefCountedMemory;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.Memory;
+
+/**
+ * Off-heap bitset,
+ * file compatible with OpeBitSet
+ */
+public class OffHeapBitSet implements IBitSet
+{
+    private final Memory bytes;
+
+    public OffHeapBitSet(long numBits)
+    {
+        // OpenBitSet.bits2words calculation is there for backward compatibility.
+        int byteCount = OpenBitSet.bits2words(numBits) * 8;
+        bytes = RefCountedMemory.allocate(byteCount);
+        // flush/clear the existing memory.
+        clear();
+    }
+
+    private OffHeapBitSet(Memory bytes)
+    {
+        this.bytes = bytes;
+    }
+
+    public long capacity()
+    {
+        return bytes.size() * 8;
+    }
+
+    public boolean get(long index)
+    {
+        long i = index >> 3;
+        long bit = index & 0x7;
+        int bitmask = 0x1 << bit;
+        return ((bytes.getByte(i) & 0xFF) & bitmask) != 0;
+    }
+
+    public void set(long index)
+    {
+        long i = index >> 3;
+        long bit = index & 0x7;
+        int bitmask = 0x1 << bit;
+        bytes.setByte(i, (byte) (bitmask | bytes.getByte(i)));
+    }
+
+    public void set(long offset, byte b)
+    {
+        bytes.setByte(offset, b);
+    }
+
+    public void clear(long index)
+    {
+        long i = index >> 3;
+        long bit = index & 0x7;
+        int bitmask = 0x1 << bit;
+        int nativeByte = (bytes.getByte(i) & 0xFF);
+        nativeByte &= ~bitmask;
+        bytes.setByte(i, (byte) nativeByte);
+    }
+
+    public void clear()
+    {
+        bytes.setMemory(0, bytes.size(), (byte) 0);
+    }
+
+    public void serialize(DataOutput dos) throws IOException
+    {
+        dos.writeInt((int) (bytes.size() / 8));
+        for (long i = 0; i < bytes.size();)
+        {
+            long value = ((bytes.getByte(i++) & 0xff) << 0) 
+                       + ((bytes.getByte(i++) & 0xff) << 8)
+                       + ((bytes.getByte(i++) & 0xff) << 16)
+                       + ((long) (bytes.getByte(i++) & 0xff) << 24)
+                       + ((long) (bytes.getByte(i++) & 0xff) << 32)
+                       + ((long) (bytes.getByte(i++) & 0xff) << 40)
+                       + ((long) (bytes.getByte(i++) & 0xff) << 48)
+                       + ((long) bytes.getByte(i++) << 56);
+            dos.writeLong(value);
+        }
+    }
+
+    public long serializedSize(TypeSizes type)
+    {
+        return type.sizeof((int) bytes.size()) + bytes.size();
+    }
+
+    public static OffHeapBitSet deserialize(DataInput dis) throws IOException
+    {
+        int byteCount = dis.readInt() * 8;
+        Memory memory = RefCountedMemory.allocate(byteCount);
+        for (int i = 0; i < byteCount;)
+        {
+            long v = dis.readLong();
+            memory.setByte(i++, (byte) (v >>> 0));
+            memory.setByte(i++, (byte) (v >>> 8));
+            memory.setByte(i++, (byte) (v >>> 16));
+            memory.setByte(i++, (byte) (v >>> 24));
+            memory.setByte(i++, (byte) (v >>> 32));
+            memory.setByte(i++, (byte) (v >>> 40));
+            memory.setByte(i++, (byte) (v >>> 48));
+            memory.setByte(i++, (byte) (v >>> 56));
+        }
+        return new OffHeapBitSet(memory);
+    }
+
+    public void close() throws IOException
+    {
+        bytes.free();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (!(o instanceof OffHeapBitSet))
+            return false;
+        OffHeapBitSet b = (OffHeapBitSet) o;
+        return bytes.equals(b.bytes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        // Similar to open bitset.
+        long h = 0;
+        for (long i = bytes.size(); --i >= 0;)
+        {
+            h ^= bytes.getByte(i);
+            h = (h << 1) | (h >>> 63); // rotate left
+        }
+        return (int) ((h >> 32) ^ h) + 0x98761234;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 1ddbe8f..4fce3f8 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils.obs;
 
 import java.util.Arrays;
-import java.io.Serializable;
-import java.util.BitSet;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
 
 /**
  * An "open" BitSet implementation that allows direct access to the arrays of words
@@ -43,7 +46,8 @@ import java.util.BitSet;
  * class, use <code>java.util.BitSet</code>.
  */
 
-public class OpenBitSet implements Serializable {
+public class OpenBitSet implements IBitSet
+{
   /**
    * We break the bitset up into multiple arrays to avoid promotion failure caused by attempting to allocate
    * large, contiguous arrays (CASSANDRA-2466).  All sub-arrays but the last are uniformly PAGE_SIZE words;
@@ -302,7 +306,7 @@ public class OpenBitSet implements Serializable {
     int newLen= Math.min(this.wlen,other.wlen);
     long[][] thisArr = this.bits;
     long[][] otherArr = other.bits;
-    int thisPageSize = this.PAGE_SIZE;
+    int thisPageSize = PAGE_SIZE;
     int otherPageSize = other.PAGE_SIZE;
     // testing against zero can be more efficient
     int pos=newLen;
@@ -383,6 +387,54 @@ public class OpenBitSet implements Serializable {
     return (int)((h>>32) ^ h) + 0x98761234;
   }
 
+  public void close() throws IOException {
+    // noop, let GC do the cleanup.
+  }
+
+  public void serialize(DataOutput dos) throws IOException {
+    int bitLength = getNumWords();
+    int pageSize = getPageSize();
+    int pageCount = getPageCount();
+
+    dos.writeInt(bitLength);
+    for (int p = 0; p < pageCount; p++) {
+      long[] bits = getPage(p);
+      for (int i = 0; i < pageSize && bitLength-- > 0; i++) {
+        dos.writeLong(bits[i]);
+      }
+    }
 }
 
+  public long serializedSize(TypeSizes type) {
+    int bitLength = getNumWords();
+    int pageSize = getPageSize();
+    int pageCount = getPageCount();
+
+    long size = type.sizeof(bitLength); // length
+    for (int p = 0; p < pageCount; p++) {
+      long[] bits = getPage(p);
+      for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+        size += type.sizeof(bits[i]); // bucket
+    }
+    return size;
+  }
+
+  public void clear() {
+    clear(0, capacity());
+  }
+
+  public static OpenBitSet deserialize(DataInput dis) throws IOException {
+    long bitLength = dis.readInt();
+
+    OpenBitSet bs = new OpenBitSet(bitLength << 6);
+    int pageSize = bs.getPageSize();
+    int pageCount = bs.getPageCount();
 
+    for (int p = 0; p < pageCount; p++) {
+      long[] bits = bs.getPage(p);
+      for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+        bits[i] = dis.readLong();
+    }
+    return bs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBitSetTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBitSetTest.java b/test/long/org/apache/cassandra/utils/LongBitSetTest.java
new file mode 100644
index 0000000..7941fae
--- /dev/null
+++ b/test/long/org/apache/cassandra/utils/LongBitSetTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LongBitSetTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(LongBitSetTest.class);
+    private static final Random random = new Random();
+
+    public void populateRandom(OffHeapBitSet offbs, OpenBitSet obs, long index)
+    {
+        if (random.nextBoolean())
+        {
+            offbs.set(index);
+            obs.set(index);
+        }
+    }
+
+    public void compare(OffHeapBitSet offbs, OpenBitSet obs, long index)
+    {
+        if (offbs.get(index) != obs.get(index))
+            throw new RuntimeException();
+        Assert.assertEquals(offbs.get(index), obs.get(index));
+    }
+
+    @Test
+    public void testBitSetOperations()
+    {
+        long size_to_test = Integer.MAX_VALUE / 40;
+        long size_and_excess = size_to_test + 20;
+        OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+        OpenBitSet obs = new OpenBitSet(size_and_excess);
+        for (long i = 0; i < size_to_test; i++)
+            populateRandom(offbs, obs, i);
+
+        for (long i = 0; i < size_to_test; i++)
+            compare(offbs, obs, i);
+    }
+
+    @Test
+    public void timeit()
+    {
+        long size_to_test = Integer.MAX_VALUE / 10; // about 214 million
+        long size_and_excess = size_to_test + 20;
+
+        OpenBitSet obs = new OpenBitSet(size_and_excess);
+        OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+        logger.info("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|");
+        // System.out.println("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|");
+        loopOnce(obs, offbs, size_to_test);
+    }
+
+    public void loopOnce(OpenBitSet obs, OffHeapBitSet offbs, long size_to_test)
+    {
+        StringBuffer buffer = new StringBuffer();
+        // start off fresh.
+        System.gc();
+        long start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            obs.set(i);
+        buffer.append("||").append(System.currentTimeMillis() - start);
+
+        start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            obs.get(i);
+        buffer.append("|").append(System.currentTimeMillis() - start);
+
+        start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            obs.clear(i);
+        buffer.append("|").append(System.currentTimeMillis() - start);
+
+        System.gc();
+        start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            offbs.set(i);
+        buffer.append("|").append(System.currentTimeMillis() - start);
+
+        start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            offbs.get(i);
+
+        buffer.append("|").append(System.currentTimeMillis() - start);
+        start = System.currentTimeMillis();
+        for (long i = 0; i < size_to_test; i++)
+            offbs.clear(i);
+        buffer.append("|").append(System.currentTimeMillis() - start).append("|");
+        logger.info(buffer.toString());
+        // System.out.println(buffer.toString());
+    }
+
+    /**
+     * Just to make sure JIT doesn't come on our way
+     */
+    @Test
+    // @Ignore
+    public void loopIt()
+    {
+        long size_to_test = Integer.MAX_VALUE / 10; // about 214 million
+        long size_and_excess = size_to_test + 20;
+
+        OpenBitSet obs = new OpenBitSet(size_and_excess);
+        OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+        for (int i = 0; i < 10; i++)
+            // 10 times to do approx 2B keys each.
+            loopOnce(obs, offbs, size_to_test);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
index d4a4c34..06ad642 100644
--- a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
+++ b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
@@ -34,7 +34,7 @@ public class LongBloomFilterTest
     public void testBigInt(FilterFactory.Type type)
     {
         int size = 10 * 1000 * 1000;
-        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
         double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.IntGenerator(size),
                                                             new KeyGenerator.IntGenerator(size, size * 2));
         logger.info("Bloom filter false positive: {}", fp);
@@ -43,7 +43,7 @@ public class LongBloomFilterTest
     public void testBigRandom(FilterFactory.Type type)
     {
         int size = 10 * 1000 * 1000;
-        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
         double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size),
                                                             new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size));
         logger.info("Bloom filter false positive: {}", fp);
@@ -52,7 +52,7 @@ public class LongBloomFilterTest
     public void timeit(FilterFactory.Type type)
     {
         int size = 300 * FilterTestHelper.ELEMENTS;
-        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+        Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
         double sumfp = 0;
         for (int i = 0; i < 10; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BitSetTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BitSetTest.java b/test/unit/org/apache/cassandra/utils/BitSetTest.java
new file mode 100644
index 0000000..9684131
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/BitSetTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.KeyGenerator.WordGenerator;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import static junit.framework.Assert.assertEquals;
+
+public class BitSetTest
+{
+    /**
+     * Test bitsets in a "real-world" environment, i.e., bloom filters
+     */
+    @Test
+    public void compareBitSets()
+    {
+        BloomFilter bf2 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, false);
+        BloomFilter bf3 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true);
+        int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2;
+        WordGenerator gen1 = new KeyGenerator.WordGenerator(skipEven, 2);
+
+        // make sure both bitsets are empty.
+        compare(bf2.bitset, bf3.bitset);
+
+        while (gen1.hasNext())
+        {
+            ByteBuffer key = gen1.next();
+            bf2.add(key);
+            bf3.add(key);
+        }
+
+        compare(bf2.bitset, bf3.bitset);
+    }
+
+    private static final String LEGACY_SST_FILE = "test/data/legacy-sstables/hb/Keyspace1/Keyspace1-Standard1-hb-0-Filter.db";
+
+    /**
+     * Test compatibility with a 1.1-version data file
+     */
+    @Test
+    public void testExpectedCompatablity() throws IOException
+    {
+        DataInputStream dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE)));
+        dis.readInt(); // bloom filter hash count
+        OpenBitSet bs = OpenBitSet.deserialize(dis);
+
+        dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE)));
+        dis.readInt(); // bloom filter hash count
+        OffHeapBitSet obs = OffHeapBitSet.deserialize(dis);
+
+        compare(obs, bs);
+    }
+
+    private static final Random random = new Random();
+
+    /**
+     * Test serialization and de-serialization in-memory
+     */
+    @Test
+    public void testOffHeapSerialization() throws IOException
+    {
+        OffHeapBitSet bs = new OffHeapBitSet(100000);
+        populateAndReserialize(bs);
+    }
+
+    @Test
+    public void testOffHeapCompatibility() throws IOException
+    {
+        OpenBitSet bs = new OpenBitSet(100000);
+        populateAndReserialize(bs);
+    }
+
+    private void populateAndReserialize(IBitSet bs) throws IOException
+    {
+        for (long i = 0; i < bs.capacity(); i++)
+            if (random.nextBoolean())
+                bs.set(i);
+
+        DataOutputBuffer dos = new DataOutputBuffer();
+        bs.serialize(dos);
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.getData()));
+        OffHeapBitSet newbs = OffHeapBitSet.deserialize(dis);
+        compare(bs, newbs);
+    }
+
+    private void compare(IBitSet bs, IBitSet newbs)
+    {
+        assertEquals(bs.capacity(), newbs.capacity());
+        for (long i = 0; i < bs.capacity(); i++)
+            Assert.assertEquals(bs.get(i), newbs.get(i));
+    }
+
+    @Test
+    public void testBitClear() throws IOException
+    {
+        int size = Integer.MAX_VALUE / 4000;
+        OffHeapBitSet bitset = new OffHeapBitSet(size);
+        List<Integer> randomBits = Lists.newArrayList();
+        for (int i = 0; i < 10; i++)
+            randomBits.add(random.nextInt(size));
+
+        for (long randomBit : randomBits)
+            bitset.set(randomBit);
+
+        for (long randomBit : randomBits)
+            Assert.assertEquals(true, bitset.get(randomBit));
+
+        for (long randomBit : randomBits)
+            bitset.clear(randomBit);
+
+        for (long randomBit : randomBits)
+            Assert.assertEquals(false, bitset.get(randomBit));
+        bitset.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index d8f596f..292bca6 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.KeyGenerator.WordGenerator;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +38,7 @@ public class BloomFilterTest
 
     public BloomFilterTest()
     {
-        bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE);
+        bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE, true);
     }
 
     public static Filter testSerialize(Filter f) throws IOException
@@ -47,7 +48,7 @@ public class BloomFilterTest
         FilterFactory.serialize(f, out, FilterFactory.Type.MURMUR3);
 
         ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength());
-        Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3);
+        Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3, true);
 
         assert f2.isPresent(ByteBufferUtil.bytes("a"));
         assert !f2.isPresent(ByteBufferUtil.bytes("b"));
@@ -101,7 +102,7 @@ public class BloomFilterTest
         {
             return;
         }
-        Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE);
+        Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true);
         int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2;
         FilterTestHelper.testFalsePositives(bf2,
                                             new KeyGenerator.WordGenerator(skipEven, 2),
@@ -123,7 +124,7 @@ public class BloomFilterTest
         {
             hashes.clear();
             ByteBuffer buf = keys.next();
-            BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 10);
+            BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 1, false);
             for (long hashIndex : bf.getHashBuckets(buf, MAX_HASH_COUNT, 1024 * 1024))
             {
                 hashes.add(hashIndex);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
index 248f325..d92315b 100644
--- a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
@@ -46,7 +46,7 @@ public class LegacyBloomFilterTest
         FilterFactory.serialize(f, out, FilterFactory.Type.SHA);
 
         ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength());
-        LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA);
+        LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA, false);
 
         assert f2.isPresent(ByteBufferUtil.bytes("a"));
         assert !f2.isPresent(ByteBufferUtil.bytes("b"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index 08053df..8d5e88a 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
 public class SerializationsTest extends AbstractSerializationsTester
 {
 
-    private void testBloomFilterWrite(Type murmur) throws IOException
+    private void testBloomFilterWrite(Type murmur, boolean offheap) throws IOException
     {
-        Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur);
+        Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur, offheap);
         for (int i = 0; i < 100; i++)
             bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken()));
         DataOutputStream out = getOutput("utils.BloomFilter.bin");
@@ -45,10 +45,10 @@ public class SerializationsTest extends AbstractSerializationsTester
     public void testBloomFilterReadMURMUR2() throws IOException
     {
         if (EXECUTE_WRITES)
-            testBloomFilterWrite(FilterFactory.Type.MURMUR2);
+            testBloomFilterWrite(FilterFactory.Type.MURMUR2, false);
 
         DataInputStream in = getInput("utils.BloomFilter.bin");
-        assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2) != null;
+        assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2, false) != null;
         in.close();
     }
 
@@ -56,10 +56,10 @@ public class SerializationsTest extends AbstractSerializationsTester
     public void testBloomFilterReadMURMUR3() throws IOException
     {
         if (EXECUTE_WRITES)
-            testBloomFilterWrite(FilterFactory.Type.MURMUR3);
+            testBloomFilterWrite(FilterFactory.Type.MURMUR3, true);
 
         DataInputStream in = getInput("utils.BloomFilter.bin");
-        assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3) != null;
+        assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3, true) != null;
         in.close();
     }
 
@@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         //      testLegacyBloomFilterWrite();
         
         DataInputStream in = getInput("utils.LegacyBloomFilter.bin");
-        assert FilterFactory.deserialize(in, FilterFactory.Type.SHA) != null;
+        assert FilterFactory.deserialize(in, FilterFactory.Type.SHA, false) != null;
         in.close();
     }