You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/31 17:23:30 UTC
git commit: off-heap bloom filters for row keys patch by vijay;
reviewed by jbellis for CASSANDRA-4865
Updated Branches:
refs/heads/trunk c34ecbf68 -> dc37dea74
off-heap bloom filters for row keys
patch by vijay; reviewed by jbellis for CASSANDRA-4865
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc37dea7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc37dea7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc37dea7
Branch: refs/heads/trunk
Commit: dc37dea745fe89d70819d649c823d9bfcb0d7577
Parents: c34ecbf
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 31 10:54:47 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 31 11:22:47 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ColumnIndex.java | 2 +-
.../org/apache/cassandra/db/RowIndexEntry.java | 2 +-
.../apache/cassandra/io/sstable/IndexHelper.java | 2 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 4 +-
.../apache/cassandra/io/sstable/SSTableWriter.java | 4 +-
src/java/org/apache/cassandra/io/util/Memory.java | 20 ++
.../org/apache/cassandra/utils/BloomFilter.java | 25 +--
.../cassandra/utils/BloomFilterSerializer.java | 51 +----
src/java/org/apache/cassandra/utils/Filter.java | 3 +-
.../org/apache/cassandra/utils/FilterFactory.java | 36 ++--
.../apache/cassandra/utils/LegacyBloomFilter.java | 6 +
.../apache/cassandra/utils/Murmur2BloomFilter.java | 16 +-
.../apache/cassandra/utils/Murmur3BloomFilter.java | 16 +-
.../org/apache/cassandra/utils/obs/IBitSet.java | 52 +++++
.../apache/cassandra/utils/obs/OffHeapBitSet.java | 160 +++++++++++++++
.../org/apache/cassandra/utils/obs/OpenBitSet.java | 60 +++++-
.../org/apache/cassandra/utils/LongBitSetTest.java | 133 ++++++++++++
.../cassandra/utils/LongBloomFilterTest.java | 6 +-
.../org/apache/cassandra/utils/BitSetTest.java | 148 +++++++++++++
.../apache/cassandra/utils/BloomFilterTest.java | 9 +-
.../cassandra/utils/LegacyBloomFilterTest.java | 2 +-
.../apache/cassandra/utils/SerializationsTest.java | 14 +-
23 files changed, 657 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11aaea1..7f4a728 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-beta2
+ * off-heap bloom filters for row keys (CASSANDRA_4865)
* add extension point for sstable components (CASSANDRA-4049)
* improve tracing output (CASSANDRA-4852, 4862)
* make TRACE verb droppable (CASSANDRA-4672)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 35ee899..946c4f4 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -36,7 +36,7 @@ public class ColumnIndex
private ColumnIndex(int estimatedColumnCount)
{
- this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4));
+ this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4, false));
}
private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex, Filter bloomFilter)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index b7660e5..a3701f8 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -119,7 +119,7 @@ public class RowIndexEntry
List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries);
for (int i = 0; i < entries; i++)
columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis));
- Filter bf = FilterFactory.deserialize(dis, version.filterType);
+ Filter bf = FilterFactory.deserialize(dis, version.filterType, false);
return new IndexedEntry(position, delInfo, columnsIndex, bf);
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 29e076a..a87ecf7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -130,7 +130,7 @@ public class IndexHelper
ByteBuffer bytes = file.readBytes(size);
DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes));
- return FilterFactory.deserialize(stream, type);
+ return FilterFactory.deserialize(stream, type, false);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7957134..812a475 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -330,7 +330,7 @@ public class SSTableReader extends SSTable
try
{
stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
- bf = FilterFactory.deserialize(stream, descriptor.version.filterType);
+ bf = FilterFactory.deserialize(stream, descriptor.version.filterType, true);
}
finally
{
@@ -899,6 +899,8 @@ public class SSTableReader extends SSTable
dfile.cleanup();
deletingTask.schedule();
+ // close the BF so it can be opened later.
+ FileUtils.closeQuietly(bf);
}
assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index c17de4c..2627a77 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -445,8 +445,8 @@ public class SSTableWriter extends SSTable
logger.error("Bloom filter FP chance of zero isn't supposed to happen");
fpChance = null;
}
- bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15)
- : FilterFactory.getFilter(keyCount, fpChance);
+ bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15, true)
+ : FilterFactory.getFilter(keyCount, fpChance, true);
}
public void append(DecoratedKey key, RowIndexEntry indexEntry)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index faef564..25f5caf 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -66,6 +66,13 @@ public class Memory
unsafe.putByte(peer + offset, b);
}
+ public void setMemory(long offset, long bytes, byte b)
+ {
+ // check if the last element will fit into the memory
+ checkPosition(offset + bytes - 1);
+ unsafe.setMemory(peer + offset, bytes, b);
+ }
+
/**
* Transfers count bytes from buffer to Memory
*
@@ -139,5 +146,18 @@ public class Memory
{
return size;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (!(o instanceof Memory))
+ return false;
+ Memory b = (Memory) o;
+ if (peer == b.peer && size == b.size)
+ return true;
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 3ca62e5..469763a 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -17,23 +17,16 @@
*/
package org.apache.cassandra.utils;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
public abstract class BloomFilter extends Filter
{
- private static final int EXCESS = 20;
+ public final IBitSet bitset;
- public final OpenBitSet bitset;
-
- BloomFilter(int hashes, long numElements, int bucketsPer)
- {
- hashCount = hashes;
- bitset = new OpenBitSet(numElements * bucketsPer + EXCESS);
- }
-
- BloomFilter(int hashes, OpenBitSet bitset)
+ BloomFilter(int hashes, IBitSet bitset)
{
this.hashCount = hashes;
this.bitset = bitset;
@@ -41,7 +34,7 @@ public abstract class BloomFilter extends Filter
private long[] getHashBuckets(ByteBuffer key)
{
- return getHashBuckets(key, hashCount, bitset.size());
+ return getHashBuckets(key, hashCount, bitset.capacity());
}
protected abstract long[] hash(ByteBuffer b, int position, int remaining, long seed);
@@ -84,6 +77,12 @@ public abstract class BloomFilter extends Filter
public void clear()
{
- bitset.clear(0, bitset.size());
+ bitset.clear();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ bitset.close();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 68997c9..6b8b355 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -23,46 +23,31 @@ import java.io.IOException;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
import org.apache.cassandra.utils.obs.OpenBitSet;
abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
{
public void serialize(BloomFilter bf, DataOutput dos) throws IOException
{
- int bitLength = bf.bitset.getNumWords();
- int pageSize = bf.bitset.getPageSize();
- int pageCount = bf.bitset.getPageCount();
-
dos.writeInt(bf.getHashCount());
- dos.writeInt(bitLength);
-
- for (int p = 0; p < pageCount; p++)
- {
- long[] bits = bf.bitset.getPage(p);
- for (int i = 0; i < pageSize && bitLength-- > 0; i++)
- dos.writeLong(bits[i]);
- }
+ bf.bitset.serialize(dos);
}
public BloomFilter deserialize(DataInput dis) throws IOException
{
- int hashes = dis.readInt();
- long bitLength = dis.readInt();
- OpenBitSet bs = new OpenBitSet(bitLength << 6);
- int pageSize = bs.getPageSize();
- int pageCount = bs.getPageCount();
-
- for (int p = 0; p < pageCount; p++)
- {
- long[] bits = bs.getPage(p);
- for (int i = 0; i < pageSize && bitLength-- > 0; i++)
- bits[i] = dis.readLong();
- }
+ return deserialize(dis, false);
+ }
+ public BloomFilter deserialize(DataInput dis, boolean offheap) throws IOException
+ {
+ int hashes = dis.readInt();
+ IBitSet bs = offheap ? OffHeapBitSet.deserialize(dis) : OpenBitSet.deserialize(dis);
return createFilter(hashes, bs);
}
- protected abstract BloomFilter createFilter(int hashes, OpenBitSet bs);
+ protected abstract BloomFilter createFilter(int hashes, IBitSet bs);
/**
* Calculates a serialized size of the given Bloom Filter
@@ -74,20 +59,8 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
*/
public long serializedSize(BloomFilter bf, TypeSizes typeSizes)
{
- int bitLength = bf.bitset.getNumWords();
- int pageSize = bf.bitset.getPageSize();
- int pageCount = bf.bitset.getPageCount();
-
- int size = 0;
- size += typeSizes.sizeof(bf.getHashCount()); // hash count
- size += typeSizes.sizeof(bitLength); // length
-
- for (int p = 0; p < pageCount; p++)
- {
- long[] bits = bf.bitset.getPage(p);
- for (int i = 0; i < pageSize && bitLength-- > 0; i++)
- size += typeSizes.sizeof(bits[i]); // bucket
- }
+ int size = typeSizes.sizeof(bf.getHashCount()); // hash count
+ size += bf.bitset.serializedSize(typeSizes);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Filter.java b/src/java/org/apache/cassandra/utils/Filter.java
index f7ce1f3..ea98401 100644
--- a/src/java/org/apache/cassandra/utils/Filter.java
+++ b/src/java/org/apache/cassandra/utils/Filter.java
@@ -17,9 +17,10 @@
*/
package org.apache.cassandra.utils;
+import java.io.Closeable;
import java.nio.ByteBuffer;
-public abstract class Filter
+public abstract class Filter implements Closeable
{
int hashCount;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 2c9bcf4..3eae519 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -22,6 +22,9 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@ public class FilterFactory
{
private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class);
private static final TypeSizes TYPE_SIZES = TypeSizes.NATIVE;
+ private static final long BITSET_EXCESS = 20;
public enum Type
{
@@ -57,16 +61,16 @@ public class FilterFactory
}
}
- public static Filter deserialize(DataInput input, Type type) throws IOException
+ public static Filter deserialize(DataInput input, Type type, boolean offheap) throws IOException
{
switch (type)
{
case SHA:
return LegacyBloomFilter.serializer.deserialize(input);
case MURMUR2:
- return Murmur2BloomFilter.serializer.deserialize(input);
+ return Murmur2BloomFilter.serializer.deserialize(input, offheap);
default:
- return Murmur3BloomFilter.serializer.deserialize(input);
+ return Murmur3BloomFilter.serializer.deserialize(input, offheap);
}
}
@@ -92,13 +96,13 @@ public class FilterFactory
* @return A BloomFilter with the lowest practical false positive
* probability for the given number of elements.
*/
- public static Filter getFilter(long numElements, int targetBucketsPerElem)
+ public static Filter getFilter(long numElements, int targetBucketsPerElem, boolean offheap)
{
- return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3);
+ return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3, offheap);
}
// helper method for test.
- static Filter getFilter(long numElements, int targetBucketsPerElem, Type type)
+ static Filter getFilter(long numElements, int targetBucketsPerElem, Type type, boolean offheap)
{
int maxBucketsPerElement = Math.max(1, BloomCalculations.maxBucketsPerElement(numElements));
int bucketsPerElement = Math.min(targetBucketsPerElem, maxBucketsPerElement);
@@ -107,7 +111,7 @@ public class FilterFactory
logger.warn(String.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).", numElements, bucketsPerElement, targetBucketsPerElem));
}
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement);
- return createFilter(spec.K, numElements, spec.bucketsPerElement, type);
+ return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap);
}
/**
@@ -117,33 +121,35 @@ public class FilterFactory
* Asserts that the given probability can be satisfied using this
* filter.
*/
- public static Filter getFilter(long numElements, double maxFalsePosProbability)
+ public static Filter getFilter(long numElements, double maxFalsePosProbability, boolean offheap)
{
- return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3);
+ return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3, offheap);
}
// helper method for test.
- static Filter getFilter(long numElements, double maxFalsePosProbability, Type type)
+ static Filter getFilter(long numElements, double maxFalsePosProbability, Type type, boolean offheap)
{
assert maxFalsePosProbability <= 1.0 : "Invalid probability";
int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability);
- return createFilter(spec.K, numElements, spec.bucketsPerElement, type);
+ return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap);
}
- private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type)
+ private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type, boolean offheap)
{
+ long numBits = (numElements * bucketsPer) + BITSET_EXCESS;
+ IBitSet bitset = offheap ? new OffHeapBitSet(numBits) : new OpenBitSet(numBits);
switch (type)
{
case MURMUR2:
- return new Murmur2BloomFilter(hash, numElements, bucketsPer);
+ return new Murmur2BloomFilter(hash, bitset);
default:
- return new Murmur3BloomFilter(hash, numElements, bucketsPer);
+ return new Murmur3BloomFilter(hash, bitset);
}
}
public static BloomFilter emptyFilter()
{
- return new Murmur3BloomFilter(0, 0, 0);
+ return new Murmur3BloomFilter(0, new OpenBitSet(BITSET_EXCESS));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
index 6f7269e..a50e2c8 100644
--- a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.utils;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
@@ -160,4 +161,9 @@ public class LegacyBloomFilter extends Filter
public BitSet getBitSet(){
return filter;
}
+
+ public void close() throws IOException
+ {
+ // Do nothing for this
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
index df5a160..1c20dd1 100644
--- a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java
@@ -19,19 +19,13 @@ package org.apache.cassandra.utils;
import java.nio.ByteBuffer;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
public class Murmur2BloomFilter extends BloomFilter
{
- public static final ISerializer<BloomFilter> serializer = new Murmur2BloomFilterSerializer();
+ public static final Murmur2BloomFilterSerializer serializer = new Murmur2BloomFilterSerializer();
- Murmur2BloomFilter(int hashes, long numElements, int bucketsPer)
- {
- super(hashes, numElements, bucketsPer);
- }
-
- private Murmur2BloomFilter(int hashes, OpenBitSet bs)
+ public Murmur2BloomFilter(int hashes, IBitSet bs)
{
super(hashes, bs);
}
@@ -43,9 +37,9 @@ public class Murmur2BloomFilter extends BloomFilter
return (new long[] { hash1, hash2 });
}
- private static class Murmur2BloomFilterSerializer extends BloomFilterSerializer
+ public static class Murmur2BloomFilterSerializer extends BloomFilterSerializer
{
- protected BloomFilter createFilter(int hashes, OpenBitSet bs)
+ protected BloomFilter createFilter(int hashes, IBitSet bs)
{
return new Murmur2BloomFilter(hashes, bs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
index 304842a..ebd506c 100644
--- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
@@ -19,19 +19,13 @@ package org.apache.cassandra.utils;
import java.nio.ByteBuffer;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.apache.cassandra.utils.obs.IBitSet;
public class Murmur3BloomFilter extends BloomFilter
{
- public static final ISerializer<BloomFilter> serializer = new Murmur3BloomFilterSerializer();
+ public static final Murmur3BloomFilterSerializer serializer = new Murmur3BloomFilterSerializer();
- Murmur3BloomFilter(int hashes, long numElements, int bucketsPer)
- {
- super(hashes, numElements, bucketsPer);
- }
-
- private Murmur3BloomFilter(int hashes, OpenBitSet bs)
+ public Murmur3BloomFilter(int hashes, IBitSet bs)
{
super(hashes, bs);
}
@@ -41,9 +35,9 @@ public class Murmur3BloomFilter extends BloomFilter
return MurmurHash.hash3_x64_128(b, b.position(), b.remaining(), seed);
}
- private static class Murmur3BloomFilterSerializer extends BloomFilterSerializer
+ public static class Murmur3BloomFilterSerializer extends BloomFilterSerializer
{
- protected BloomFilter createFilter(int hashes, OpenBitSet bs)
+ protected BloomFilter createFilter(int hashes, IBitSet bs)
{
return new Murmur3BloomFilter(hashes, bs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
new file mode 100644
index 0000000..c5a2fb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.obs;
+
+import java.io.Closeable;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+
+public interface IBitSet extends Closeable
+{
+ public long capacity();
+
+ /**
+ * Returns true or false for the specified bit index. The index should be
+ * less than the capacity.
+ */
+ public boolean get(long index);
+
+ /**
+ * Sets the bit at the specified index. The index should be less than the
+ * capacity.
+ */
+ public void set(long index);
+
+ /**
+ * clears the bit. The index should be less than the capacity.
+ */
+ public void clear(long index);
+
+ public void serialize(DataOutput dos) throws IOException;
+
+ public long serializedSize(TypeSizes type);
+
+ public void clear();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
new file mode 100644
index 0000000..1733a81
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.obs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.cache.RefCountedMemory;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.Memory;
+
+/**
+ * Off-heap bitset,
+ * file compatible with OpeBitSet
+ */
+public class OffHeapBitSet implements IBitSet
+{
+ private final Memory bytes;
+
+ public OffHeapBitSet(long numBits)
+ {
+ // OpenBitSet.bits2words calculation is there for backward compatibility.
+ int byteCount = OpenBitSet.bits2words(numBits) * 8;
+ bytes = RefCountedMemory.allocate(byteCount);
+ // flush/clear the existing memory.
+ clear();
+ }
+
+ private OffHeapBitSet(Memory bytes)
+ {
+ this.bytes = bytes;
+ }
+
+ public long capacity()
+ {
+ return bytes.size() * 8;
+ }
+
+ public boolean get(long index)
+ {
+ long i = index >> 3;
+ long bit = index & 0x7;
+ int bitmask = 0x1 << bit;
+ return ((bytes.getByte(i) & 0xFF) & bitmask) != 0;
+ }
+
+ public void set(long index)
+ {
+ long i = index >> 3;
+ long bit = index & 0x7;
+ int bitmask = 0x1 << bit;
+ bytes.setByte(i, (byte) (bitmask | bytes.getByte(i)));
+ }
+
+ public void set(long offset, byte b)
+ {
+ bytes.setByte(offset, b);
+ }
+
+ public void clear(long index)
+ {
+ long i = index >> 3;
+ long bit = index & 0x7;
+ int bitmask = 0x1 << bit;
+ int nativeByte = (bytes.getByte(i) & 0xFF);
+ nativeByte &= ~bitmask;
+ bytes.setByte(i, (byte) nativeByte);
+ }
+
+ public void clear()
+ {
+ bytes.setMemory(0, bytes.size(), (byte) 0);
+ }
+
+ public void serialize(DataOutput dos) throws IOException
+ {
+ dos.writeInt((int) (bytes.size() / 8));
+ for (long i = 0; i < bytes.size();)
+ {
+ long value = ((bytes.getByte(i++) & 0xff) << 0)
+ + ((bytes.getByte(i++) & 0xff) << 8)
+ + ((bytes.getByte(i++) & 0xff) << 16)
+ + ((long) (bytes.getByte(i++) & 0xff) << 24)
+ + ((long) (bytes.getByte(i++) & 0xff) << 32)
+ + ((long) (bytes.getByte(i++) & 0xff) << 40)
+ + ((long) (bytes.getByte(i++) & 0xff) << 48)
+ + ((long) bytes.getByte(i++) << 56);
+ dos.writeLong(value);
+ }
+ }
+
+ public long serializedSize(TypeSizes type)
+ {
+ return type.sizeof((int) bytes.size()) + bytes.size();
+ }
+
+ public static OffHeapBitSet deserialize(DataInput dis) throws IOException
+ {
+ int byteCount = dis.readInt() * 8;
+ Memory memory = RefCountedMemory.allocate(byteCount);
+ for (int i = 0; i < byteCount;)
+ {
+ long v = dis.readLong();
+ memory.setByte(i++, (byte) (v >>> 0));
+ memory.setByte(i++, (byte) (v >>> 8));
+ memory.setByte(i++, (byte) (v >>> 16));
+ memory.setByte(i++, (byte) (v >>> 24));
+ memory.setByte(i++, (byte) (v >>> 32));
+ memory.setByte(i++, (byte) (v >>> 40));
+ memory.setByte(i++, (byte) (v >>> 48));
+ memory.setByte(i++, (byte) (v >>> 56));
+ }
+ return new OffHeapBitSet(memory);
+ }
+
+ public void close() throws IOException
+ {
+ bytes.free();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (!(o instanceof OffHeapBitSet))
+ return false;
+ OffHeapBitSet b = (OffHeapBitSet) o;
+ return bytes.equals(b.bytes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // Similar to open bitset.
+ long h = 0;
+ for (long i = bytes.size(); --i >= 0;)
+ {
+ h ^= bytes.getByte(i);
+ h = (h << 1) | (h >>> 63); // rotate left
+ }
+ return (int) ((h >> 32) ^ h) + 0x98761234;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 1ddbe8f..4fce3f8 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils.obs;
import java.util.Arrays;
-import java.io.Serializable;
-import java.util.BitSet;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
/**
* An "open" BitSet implementation that allows direct access to the arrays of words
@@ -43,7 +46,8 @@ import java.util.BitSet;
* class, use <code>java.util.BitSet</code>.
*/
-public class OpenBitSet implements Serializable {
+public class OpenBitSet implements IBitSet
+{
/**
* We break the bitset up into multiple arrays to avoid promotion failure caused by attempting to allocate
* large, contiguous arrays (CASSANDRA-2466). All sub-arrays but the last are uniformly PAGE_SIZE words;
@@ -302,7 +306,7 @@ public class OpenBitSet implements Serializable {
int newLen= Math.min(this.wlen,other.wlen);
long[][] thisArr = this.bits;
long[][] otherArr = other.bits;
- int thisPageSize = this.PAGE_SIZE;
+ int thisPageSize = PAGE_SIZE;
int otherPageSize = other.PAGE_SIZE;
// testing against zero can be more efficient
int pos=newLen;
@@ -383,6 +387,54 @@ public class OpenBitSet implements Serializable {
return (int)((h>>32) ^ h) + 0x98761234;
}
+ public void close() throws IOException {
+ // noop, let GC do the cleanup.
+ }
+
+ public void serialize(DataOutput dos) throws IOException {
+ int bitLength = getNumWords();
+ int pageSize = getPageSize();
+ int pageCount = getPageCount();
+
+ dos.writeInt(bitLength);
+ for (int p = 0; p < pageCount; p++) {
+ long[] bits = getPage(p);
+ for (int i = 0; i < pageSize && bitLength-- > 0; i++) {
+ dos.writeLong(bits[i]);
+ }
+ }
}
+ public long serializedSize(TypeSizes type) {
+ int bitLength = getNumWords();
+ int pageSize = getPageSize();
+ int pageCount = getPageCount();
+
+ long size = type.sizeof(bitLength); // length
+ for (int p = 0; p < pageCount; p++) {
+ long[] bits = getPage(p);
+ for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+ size += type.sizeof(bits[i]); // bucket
+ }
+ return size;
+ }
+
+ public void clear() {
+ clear(0, capacity());
+ }
+
+ public static OpenBitSet deserialize(DataInput dis) throws IOException {
+ long bitLength = dis.readInt();
+
+ OpenBitSet bs = new OpenBitSet(bitLength << 6);
+ int pageSize = bs.getPageSize();
+ int pageCount = bs.getPageCount();
+ for (int p = 0; p < pageCount; p++) {
+ long[] bits = bs.getPage(p);
+ for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+ bits[i] = dis.readLong();
+ }
+ return bs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBitSetTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBitSetTest.java b/test/long/org/apache/cassandra/utils/LongBitSetTest.java
new file mode 100644
index 0000000..7941fae
--- /dev/null
+++ b/test/long/org/apache/cassandra/utils/LongBitSetTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LongBitSetTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(LongBitSetTest.class);
+ private static final Random random = new Random();
+
+ public void populateRandom(OffHeapBitSet offbs, OpenBitSet obs, long index)
+ {
+ if (random.nextBoolean())
+ {
+ offbs.set(index);
+ obs.set(index);
+ }
+ }
+
+ public void compare(OffHeapBitSet offbs, OpenBitSet obs, long index)
+ {
+ if (offbs.get(index) != obs.get(index))
+ throw new RuntimeException();
+ Assert.assertEquals(offbs.get(index), obs.get(index));
+ }
+
+ @Test
+ public void testBitSetOperations()
+ {
+ long size_to_test = Integer.MAX_VALUE / 40;
+ long size_and_excess = size_to_test + 20;
+ OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+ OpenBitSet obs = new OpenBitSet(size_and_excess);
+ for (long i = 0; i < size_to_test; i++)
+ populateRandom(offbs, obs, i);
+
+ for (long i = 0; i < size_to_test; i++)
+ compare(offbs, obs, i);
+ }
+
+ @Test
+ public void timeit()
+ {
+ long size_to_test = Integer.MAX_VALUE / 10; // about 214 million
+ long size_and_excess = size_to_test + 20;
+
+ OpenBitSet obs = new OpenBitSet(size_and_excess);
+ OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+ logger.info("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|");
+ // System.out.println("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|");
+ loopOnce(obs, offbs, size_to_test);
+ }
+
+ public void loopOnce(OpenBitSet obs, OffHeapBitSet offbs, long size_to_test)
+ {
+ StringBuffer buffer = new StringBuffer();
+ // start off fresh.
+ System.gc();
+ long start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ obs.set(i);
+ buffer.append("||").append(System.currentTimeMillis() - start);
+
+ start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ obs.get(i);
+ buffer.append("|").append(System.currentTimeMillis() - start);
+
+ start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ obs.clear(i);
+ buffer.append("|").append(System.currentTimeMillis() - start);
+
+ System.gc();
+ start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ offbs.set(i);
+ buffer.append("|").append(System.currentTimeMillis() - start);
+
+ start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ offbs.get(i);
+
+ buffer.append("|").append(System.currentTimeMillis() - start);
+ start = System.currentTimeMillis();
+ for (long i = 0; i < size_to_test; i++)
+ offbs.clear(i);
+ buffer.append("|").append(System.currentTimeMillis() - start).append("|");
+ logger.info(buffer.toString());
+ // System.out.println(buffer.toString());
+ }
+
+ /**
+ * Just to make sure JIT doesn't come on our way
+ */
+ @Test
+ // @Ignore
+ public void loopIt()
+ {
+ long size_to_test = Integer.MAX_VALUE / 10; // about 214 million
+ long size_and_excess = size_to_test + 20;
+
+ OpenBitSet obs = new OpenBitSet(size_and_excess);
+ OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess);
+ for (int i = 0; i < 10; i++)
+ // 10 times to do approx 2B keys each.
+ loopOnce(obs, offbs, size_to_test);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
index d4a4c34..06ad642 100644
--- a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
+++ b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java
@@ -34,7 +34,7 @@ public class LongBloomFilterTest
public void testBigInt(FilterFactory.Type type)
{
int size = 10 * 1000 * 1000;
- Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+ Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.IntGenerator(size),
new KeyGenerator.IntGenerator(size, size * 2));
logger.info("Bloom filter false positive: {}", fp);
@@ -43,7 +43,7 @@ public class LongBloomFilterTest
public void testBigRandom(FilterFactory.Type type)
{
int size = 10 * 1000 * 1000;
- Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+ Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size),
new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size));
logger.info("Bloom filter false positive: {}", fp);
@@ -52,7 +52,7 @@ public class LongBloomFilterTest
public void timeit(FilterFactory.Type type)
{
int size = 300 * FilterTestHelper.ELEMENTS;
- Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type);
+ Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false);
double sumfp = 0;
for (int i = 0; i < 10; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BitSetTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BitSetTest.java b/test/unit/org/apache/cassandra/utils/BitSetTest.java
new file mode 100644
index 0000000..9684131
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/BitSetTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.KeyGenerator.WordGenerator;
+import org.apache.cassandra.utils.obs.IBitSet;
+import org.apache.cassandra.utils.obs.OffHeapBitSet;
+import org.apache.cassandra.utils.obs.OpenBitSet;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import static junit.framework.Assert.assertEquals;
+
+public class BitSetTest
+{
+ /**
+ * Test bitsets in a "real-world" environment, i.e., bloom filters
+ */
+ @Test
+ public void compareBitSets()
+ {
+ BloomFilter bf2 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, false);
+ BloomFilter bf3 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true);
+ int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2;
+ WordGenerator gen1 = new KeyGenerator.WordGenerator(skipEven, 2);
+
+ // make sure both bitsets are empty.
+ compare(bf2.bitset, bf3.bitset);
+
+ while (gen1.hasNext())
+ {
+ ByteBuffer key = gen1.next();
+ bf2.add(key);
+ bf3.add(key);
+ }
+
+ compare(bf2.bitset, bf3.bitset);
+ }
+
+ private static final String LEGACY_SST_FILE = "test/data/legacy-sstables/hb/Keyspace1/Keyspace1-Standard1-hb-0-Filter.db";
+
+ /**
+ * Test compatibility with a 1.1-version data file
+ */
+ @Test
+ public void testExpectedCompatablity() throws IOException
+ {
+ DataInputStream dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE)));
+ dis.readInt(); // bloom filter hash count
+ OpenBitSet bs = OpenBitSet.deserialize(dis);
+
+ dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE)));
+ dis.readInt(); // bloom filter hash count
+ OffHeapBitSet obs = OffHeapBitSet.deserialize(dis);
+
+ compare(obs, bs);
+ }
+
+ private static final Random random = new Random();
+
+ /**
+ * Test serialization and de-serialization in-memory
+ */
+ @Test
+ public void testOffHeapSerialization() throws IOException
+ {
+ OffHeapBitSet bs = new OffHeapBitSet(100000);
+ populateAndReserialize(bs);
+ }
+
+ @Test
+ public void testOffHeapCompatibility() throws IOException
+ {
+ OpenBitSet bs = new OpenBitSet(100000);
+ populateAndReserialize(bs);
+ }
+
+ private void populateAndReserialize(IBitSet bs) throws IOException
+ {
+ for (long i = 0; i < bs.capacity(); i++)
+ if (random.nextBoolean())
+ bs.set(i);
+
+ DataOutputBuffer dos = new DataOutputBuffer();
+ bs.serialize(dos);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.getData()));
+ OffHeapBitSet newbs = OffHeapBitSet.deserialize(dis);
+ compare(bs, newbs);
+ }
+
+ private void compare(IBitSet bs, IBitSet newbs)
+ {
+ assertEquals(bs.capacity(), newbs.capacity());
+ for (long i = 0; i < bs.capacity(); i++)
+ Assert.assertEquals(bs.get(i), newbs.get(i));
+ }
+
+ @Test
+ public void testBitClear() throws IOException
+ {
+ int size = Integer.MAX_VALUE / 4000;
+ OffHeapBitSet bitset = new OffHeapBitSet(size);
+ List<Integer> randomBits = Lists.newArrayList();
+ for (int i = 0; i < 10; i++)
+ randomBits.add(random.nextInt(size));
+
+ for (long randomBit : randomBits)
+ bitset.set(randomBit);
+
+ for (long randomBit : randomBits)
+ Assert.assertEquals(true, bitset.get(randomBit));
+
+ for (long randomBit : randomBits)
+ bitset.clear(randomBit);
+
+ for (long randomBit : randomBits)
+ Assert.assertEquals(false, bitset.get(randomBit));
+ bitset.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index d8f596f..292bca6 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.KeyGenerator.WordGenerator;
import org.junit.Before;
import org.junit.Test;
@@ -37,7 +38,7 @@ public class BloomFilterTest
public BloomFilterTest()
{
- bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE);
+ bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE, true);
}
public static Filter testSerialize(Filter f) throws IOException
@@ -47,7 +48,7 @@ public class BloomFilterTest
FilterFactory.serialize(f, out, FilterFactory.Type.MURMUR3);
ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength());
- Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3);
+ Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3, true);
assert f2.isPresent(ByteBufferUtil.bytes("a"));
assert !f2.isPresent(ByteBufferUtil.bytes("b"));
@@ -101,7 +102,7 @@ public class BloomFilterTest
{
return;
}
- Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE);
+ Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true);
int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2;
FilterTestHelper.testFalsePositives(bf2,
new KeyGenerator.WordGenerator(skipEven, 2),
@@ -123,7 +124,7 @@ public class BloomFilterTest
{
hashes.clear();
ByteBuffer buf = keys.next();
- BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 10);
+ BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 1, false);
for (long hashIndex : bf.getHashBuckets(buf, MAX_HASH_COUNT, 1024 * 1024))
{
hashes.add(hashIndex);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
index 248f325..d92315b 100644
--- a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
@@ -46,7 +46,7 @@ public class LegacyBloomFilterTest
FilterFactory.serialize(f, out, FilterFactory.Type.SHA);
ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength());
- LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA);
+ LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA, false);
assert f2.isPresent(ByteBufferUtil.bytes("a"));
assert !f2.isPresent(ByteBufferUtil.bytes("b"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index 08053df..8d5e88a 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
public class SerializationsTest extends AbstractSerializationsTester
{
- private void testBloomFilterWrite(Type murmur) throws IOException
+ private void testBloomFilterWrite(Type murmur, boolean offheap) throws IOException
{
- Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur);
+ Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur, offheap);
for (int i = 0; i < 100; i++)
bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken()));
DataOutputStream out = getOutput("utils.BloomFilter.bin");
@@ -45,10 +45,10 @@ public class SerializationsTest extends AbstractSerializationsTester
public void testBloomFilterReadMURMUR2() throws IOException
{
if (EXECUTE_WRITES)
- testBloomFilterWrite(FilterFactory.Type.MURMUR2);
+ testBloomFilterWrite(FilterFactory.Type.MURMUR2, false);
DataInputStream in = getInput("utils.BloomFilter.bin");
- assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2) != null;
+ assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2, false) != null;
in.close();
}
@@ -56,10 +56,10 @@ public class SerializationsTest extends AbstractSerializationsTester
public void testBloomFilterReadMURMUR3() throws IOException
{
if (EXECUTE_WRITES)
- testBloomFilterWrite(FilterFactory.Type.MURMUR3);
+ testBloomFilterWrite(FilterFactory.Type.MURMUR3, true);
DataInputStream in = getInput("utils.BloomFilter.bin");
- assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3) != null;
+ assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3, true) != null;
in.close();
}
@@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester
// testLegacyBloomFilterWrite();
DataInputStream in = getInput("utils.LegacyBloomFilter.bin");
- assert FilterFactory.deserialize(in, FilterFactory.Type.SHA) != null;
+ assert FilterFactory.deserialize(in, FilterFactory.Type.SHA, false) != null;
in.close();
}