You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2017/01/13 14:32:46 UTC

kylin git commit: KYLIN-2387 A new BitmapCounter with better performance

Repository: kylin
Updated Branches:
  refs/heads/master 4b9772151 -> d19533c45


KYLIN-2387 A new BitmapCounter with better performance


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d19533c4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d19533c4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d19533c4

Branch: refs/heads/master
Commit: d19533c45414315b3ec29428edfd7c1df6d33604
Parents: 4b97721
Author: gaodayue <ga...@meituan.com>
Authored: Wed Jan 11 15:12:31 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Jan 13 22:32:05 2017 +0800

----------------------------------------------------------------------
 .../common/util/ByteBufferOutputStream.java     |  43 +++++
 .../gridtable/AggregationCacheMemSizeTest.java  |  10 +-
 .../metadata/measure/MeasureCodecTest.java      |  10 +-
 .../kylin/measure/bitmap/BitmapAggregator.java  |  46 +++--
 .../kylin/measure/bitmap/BitmapCounter.java     | 181 ++++---------------
 .../bitmap/BitmapDistinctCountAggFunc.java      |  35 ++--
 .../BitmapIntersectDistinctCountAggFunc.java    |  37 ++--
 .../kylin/measure/bitmap/BitmapMeasureType.java |  52 +++---
 .../kylin/measure/bitmap/BitmapSerializer.java  |  31 +---
 .../measure/bitmap/ImmutableBitmapCounter.java  | 112 ++++++++++++
 .../measure/bitmap/MutableBitmapCounter.java    |  60 ++++++
 .../measure/AggregatorMemEstimateTest.java      |   4 +-
 .../measure/bitmap/BitmapAggregatorTest.java    |  38 ++--
 .../kylin/measure/bitmap/BitmapCounterTest.java |  88 +++++----
 .../measure/bitmap/BitmapSerializerTest.java    |  60 +++---
 15 files changed, 457 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java
new file mode 100644
index 0000000..2e3ff07
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteBufferOutputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.common.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * An OutputStream whose target is a {@link ByteBuffer}.
+ */
+public class ByteBufferOutputStream extends OutputStream {
+    protected final ByteBuffer buffer;
+
+    public ByteBufferOutputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        buffer.put((byte) b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        buffer.put(b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index b164e54..63c7672 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -33,7 +33,7 @@ import org.apache.kylin.measure.basic.BigDecimalSumAggregator;
 import org.apache.kylin.measure.basic.DoubleSumAggregator;
 import org.apache.kylin.measure.basic.LongSumAggregator;
 import org.apache.kylin.measure.bitmap.BitmapAggregator;
-import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
 import org.apache.kylin.measure.hllc.HLLCAggregator;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.github.jamm.MemoryMeter;
@@ -43,13 +43,13 @@ import com.google.common.base.Stopwatch;
 
 public class AggregationCacheMemSizeTest {
     private static final MemoryMeter meter = new MemoryMeter();
-    private static final BitmapCounter[] bitmaps = new BitmapCounter[5];
+    private static final MutableBitmapCounter[] bitmaps = new MutableBitmapCounter[5];
     private static final Random random = new Random();
 
     // consider bitmaps with variant cardinality
     static {
         for (int i = 0; i < bitmaps.length; i++) {
-            bitmaps[i] = new BitmapCounter();
+            bitmaps[i] = new MutableBitmapCounter();
         }
 
         final int totalBits = 1_000_000;
@@ -116,8 +116,8 @@ public class AggregationCacheMemSizeTest {
     }
 
     private BitmapAggregator createBitmapAggr(boolean lowCardinality) {
-        BitmapCounter counter = new BitmapCounter();
-        counter.merge(lowCardinality ? bitmaps[0] : bitmaps[3]);
+        MutableBitmapCounter counter = new MutableBitmapCounter();
+        counter.orWith(lowCardinality ? bitmaps[0] : bitmaps[3]);
 
         BitmapAggregator result = new BitmapAggregator();
         result.aggregate(counter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index ad4d90b..97c9751 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -34,7 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * 
+ *
  */
 public class MeasureCodecTest extends LocalFileMetadataTestCase {
     @BeforeClass
@@ -58,7 +58,7 @@ public class MeasureCodecTest extends LocalFileMetadataTestCase {
         HLLCounter hllc = new HLLCounter(16);
         hllc.add("1234567");
         hllc.add("abcdefg");
-        BitmapCounter bitmap = new BitmapCounter();
+        MutableBitmapCounter bitmap = new MutableBitmapCounter();
         bitmap.add(123);
         bitmap.add(45678);
         bitmap.add(Integer.MAX_VALUE - 10);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index cd0b4bb..2c91bfa 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -20,37 +20,49 @@ package org.apache.kylin.measure.bitmap;
 
 import org.apache.kylin.measure.MeasureAggregator;
 
-/**
- * Created by sunyerui on 15/12/2.
- */
 public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
 
-    private BitmapCounter sum = null;
+    private ImmutableBitmapCounter sum;
+    private boolean isMutable;
 
     @Override
     public void reset() {
         sum = null;
+        isMutable = false;
     }
 
     @Override
     public void aggregate(BitmapCounter value) {
+        ImmutableBitmapCounter v = (ImmutableBitmapCounter) value;
+
+        // Here we optimize for case when group only has 1 value. In such situation, no
+        // aggregation is needed, so we just keep a reference to the first value, saving
+        // the cost of deserialization and merging.
         if (sum == null) {
-            sum = new BitmapCounter(value);
-        } else {
-            sum.merge(value);
+            sum = v;
+            return;
+        }
+
+        MutableBitmapCounter mutable;
+        if (!isMutable) {   // when aggregate the second value
+            mutable = sum.toMutable();
+            sum = mutable;
+            isMutable = true;
+        } else {    // for the third, forth, ...
+            mutable = (MutableBitmapCounter) sum;
         }
+        mutable.orWith(v);
     }
 
     @Override
     public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
-        if (value1 == null) {
-            return new BitmapCounter(value2);
-        } else if (value2 == null) {
-            return new BitmapCounter(value1);
+        MutableBitmapCounter merged = new MutableBitmapCounter();
+        if (value1 != null) {
+            merged.orWith((ImmutableBitmapCounter) value1);
+        }
+        if (value2 != null) {
+            merged.orWith((ImmutableBitmapCounter) value2);
         }
-
-        BitmapCounter merged = new BitmapCounter(value1);
-        merged.merge(value2);
         return merged;
     }
 
@@ -61,10 +73,6 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
 
     @Override
     public int getMemBytesEstimate() {
-        if (sum == null) {
-            return Integer.MIN_VALUE;
-        } else {
-            return sum.getMemBytes();
-        }
+        return sum == null ? 0 : sum.getMemBytes();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index ac932ce..f07059c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -18,158 +18,43 @@
 
 package org.apache.kylin.measure.bitmap;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
-import org.apache.kylin.common.util.ByteBufferBackedInputStream;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
 /**
- * Created by sunyerui on 15/12/1.
+ * An implementation-agnostic bitmap type.
  */
-public class BitmapCounter implements Comparable<BitmapCounter>, java.io.Serializable {
-
-    private MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
-
-    public BitmapCounter() {
-    }
-
-    public BitmapCounter(BitmapCounter another) {
-        merge(another);
-    }
-
-    public void clear() {
-        bitmap.clear();
-    }
-
-    public BitmapCounter clone() {
-        BitmapCounter newCounter = new BitmapCounter();
-        newCounter.bitmap = bitmap.clone();
-        return newCounter;
-    }
-
-    public void add(int value) {
-        bitmap.add(value);
-    }
-
-    public void add(String value) {
-        if (value == null || value.isEmpty()) {
-            return;
-        }
-        add(Integer.parseInt(value));
-    }
-
-    public void merge(BitmapCounter another) {
-        this.bitmap.or(another.bitmap);
-    }
-
-    public void intersect(BitmapCounter another) {
-        this.bitmap.and(another.bitmap);
-    }
-
-    public long getCount() {
-        return this.bitmap.getCardinality();
-    }
-
-    public int getMemBytes() {
-        return this.bitmap.getSizeInBytes();
-    }
-
-    public Iterator<Integer> iterator() {
-        return bitmap.iterator();
-    }
-
-    public void writeRegisters(ByteBuffer out) throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        bitmap.runOptimize();
-        bitmap.serialize(dos);
-        dos.close();
-        ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
-        out.put(bb);
-    }
-
-    public void readRegisters(ByteBuffer in) throws IOException {
-        try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(in))) {
-            bitmap.deserialize(is);
-        }
-    }
-
-    @Override
-    public String toString() {
-        long count = getCount();
-        if (count <= 10) {
-            return "(" + count + ")" + bitmap.toString();
-        } else {
-            StringBuilder sb = new StringBuilder();
-            sb.append("(").append(count).append("){");
-            int values = 0;
-            for (Integer v : bitmap) {
-                if (values++ < 10) {
-                    sb.append(v).append(",");
-                } else {
-                    sb.append("...");
-                    break;
-                }
-            }
-            sb.append("}");
-            return sb.toString();
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + bitmap.hashCode();
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        BitmapCounter other = (BitmapCounter) obj;
-        return bitmap.equals(other.bitmap);
-    }
-
-    @Override
-    public int compareTo(BitmapCounter o) {
-        if (o == null)
-            return 1;
-
-        long e1 = this.getCount();
-        long e2 = o.getCount();
-
-        if (e1 == e2)
-            return 0;
-        else if (e1 > e2)
-            return 1;
-        else
-            return -1;
-    }
-
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        int len;
-
-        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
-        try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(in))) {
-            bitmap.deserialize(is);
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-
-        len = in.position() - mark;
-        in.position(mark);
-        return len;
-    }
+public interface BitmapCounter extends Iterable<Integer> {
+    /**
+     * @return cardinality of the bitmap
+     */
+    long getCount();
+
+    /**
+     * @return estimated memory footprint of this counter
+     */
+    int getMemBytes();
+
+    /**
+     * @return a iterator of the ints stored in this counter.
+     */
+    Iterator<Integer> iterator();
+
+    /**
+     * Serialize this counter. The current counter is not modified.
+     */
+    void serialize(ByteBuffer out) throws IOException;
+
+    /**
+     * Deserialize a counter from its serialized form.
+     * <p> After deserialize, any changes to `in` should not affect the returned counter.
+     */
+    BitmapCounter deserialize(ByteBuffer in) throws IOException;
+
+    /**
+     * @return size of the counter stored in the current position of `in`.
+     * The position field must not be modified.
+     */
+    int peekLength(ByteBuffer in);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java
index d039b6d..66215e1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapDistinctCountAggFunc.java
@@ -18,35 +18,30 @@
 
 package org.apache.kylin.measure.bitmap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
- * Created by sunyerui on 15/12/22.
+ * Bitmap-based distinct count UDAF, called by calcite runtime.
  */
 public class BitmapDistinctCountAggFunc {
 
-    private static final Logger logger = LoggerFactory.getLogger(BitmapDistinctCountAggFunc.class);
-
-    public static BitmapCounter init() {
-        return null;
+    public static BitmapAggregator init() {
+        return new BitmapAggregator();
     }
 
-    public static BitmapCounter add(BitmapCounter counter, Object v) {
-        BitmapCounter c = (BitmapCounter) v;
-        if (counter == null) {
-            return new BitmapCounter(c);
-        } else {
-            counter.merge(c);
-            return counter;
-        }
+    public static BitmapAggregator add(BitmapAggregator agg, Object value) {
+        agg.aggregate((BitmapCounter) value);
+        return agg;
     }
 
-    public static BitmapCounter merge(BitmapCounter counter0, Object counter1) {
-        return add(counter0, counter1);
+    public static BitmapAggregator merge(BitmapAggregator agg, Object value) {
+        BitmapAggregator agg2 = (BitmapAggregator) value;
+        if (agg2.getState() == null) {
+            return agg;
+        }
+        return add(agg, agg2.getState());
     }
 
-    public static long result(BitmapCounter counter) {
-        return counter == null ? 0L : counter.getCount();
+    public static long result(BitmapAggregator agg) {
+        BitmapCounter finalState = agg.getState();
+        return finalState == null ? 0 : finalState.getCount();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index cf42d1b..dcdf945 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -17,9 +17,6 @@
 */
 package org.apache.kylin.measure.bitmap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,10 +28,9 @@ import java.util.Map;
  *          requires an bitmap count distinct measure of uuid, and an dimension of event
  */
 public class BitmapIntersectDistinctCountAggFunc {
-    private static final Logger logger = LoggerFactory.getLogger(BitmapIntersectDistinctCountAggFunc.class);
 
     public static class RetentionPartialResult {
-        Map<Object, BitmapCounter> map;
+        Map<Object, MutableBitmapCounter> map;
         List keyList;
 
         public RetentionPartialResult() {
@@ -45,29 +41,34 @@ public class BitmapIntersectDistinctCountAggFunc {
             if (this.keyList == null) {
                 this.keyList = keyList;
             }
-            BitmapCounter counter = map.get(key);
-            if (counter == null) {
-                counter = new BitmapCounter();
-                map.put(key, counter);
+            if (this.keyList != null && this.keyList.contains(key)) {
+                MutableBitmapCounter counter = map.get(key);
+                if (counter == null) {
+                    counter = new MutableBitmapCounter();
+                    map.put(key, counter);
+                }
+                counter.orWith((ImmutableBitmapCounter) value);
             }
-            counter.merge((BitmapCounter)value);
         }
 
         public long result() {
             if (keyList == null || keyList.isEmpty()) {
                 return 0;
             }
-            BitmapCounter counter = null;
+            // if any specified key not in map, the intersection must be 0
             for (Object key : keyList) {
-                BitmapCounter c = map.get(key);
-                if (c == null) {
-                    // We have a key in filter list but not in map, meaning there's no intersect data
+                if (!map.containsKey(key)) {
                     return 0;
+                }
+            }
+            MutableBitmapCounter counter = null;
+            for (Object key : keyList) {
+                MutableBitmapCounter c = map.get(key);
+                if (counter == null) {
+                    counter = new MutableBitmapCounter();
+                    counter.orWith(c);
                 } else {
-                    if (counter == null) {
-                        counter = c.clone();
-                    }
-                    counter.intersect(c);
+                    counter.andWith(c);
                 }
             }
             return counter.getCount();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 6ad82a1..b6f1975 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -18,8 +18,8 @@
 
 package org.apache.kylin.measure.bitmap;
 
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -38,6 +38,8 @@ import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
 
 import com.google.common.collect.ImmutableMap;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
  * Created by sunyerui on 15/12/10.
  */
@@ -77,11 +79,14 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
 
     @Override
     public void validate(FunctionDesc functionDesc) throws IllegalArgumentException {
-        if (FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()) == false)
-            throw new IllegalArgumentException("BitmapMeasureType func is not " + FUNC_COUNT_DISTINCT + " but " + functionDesc.getExpression());
-
-        if (DATATYPE_BITMAP.equals(functionDesc.getReturnDataType().getName()) == false)
-            throw new IllegalArgumentException("BitmapMeasureType datatype is not " + DATATYPE_BITMAP + " but " + functionDesc.getReturnDataType().getName());
+        checkArgument(FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()),
+                "BitmapMeasureType only support function %s, got %s", FUNC_COUNT_DISTINCT, functionDesc.getExpression());
+        checkArgument(functionDesc.getParameterCount() == 1,
+                "BitmapMeasureType only support 1 parameter, got %d", functionDesc.getParameterCount());
+
+        String returnType = functionDesc.getReturnDataType().getName();
+        checkArgument(DATATYPE_BITMAP.equals(returnType),
+                "BitmapMeasureType's return type must be %s, got %s", DATATYPE_BITMAP, returnType);
     }
 
     @Override
@@ -92,24 +97,29 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
     @Override
     public MeasureIngester<BitmapCounter> newIngester() {
         return new MeasureIngester<BitmapCounter>() {
-            BitmapCounter current = new BitmapCounter();
+            MutableBitmapCounter current = new MutableBitmapCounter();
 
             @Override
             public BitmapCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
-                BitmapCounter bitmap = current;
+                checkArgument(values.length == 1, "expect 1 value, got %s", Arrays.toString(values));
+
+                MutableBitmapCounter bitmap = current;
                 bitmap.clear();
+
+                if (values[0] == null) {
+                    return bitmap;
+                }
+
+                int id;
                 if (needDictionaryColumn(measureDesc.getFunction())) {
                     TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0);
                     Dictionary<String> dictionary = dictionaryMap.get(literalCol);
-                    if (values != null && values.length > 0 && values[0] != null) {
-                        int id = dictionary.getIdFromValue(values[0]);
-                        bitmap.add(id);
-                    }
+                    id = dictionary.getIdFromValue(values[0]);
                 } else {
-                    for (String value : values) {
-                        bitmap.add(value);
-                    }
+                    id = Integer.parseInt(values[0]);
                 }
+
+                bitmap.add(id);
                 return bitmap;
             }
 
@@ -122,11 +132,9 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
                 Dictionary<String> sourceDict = oldDicts.get(colRef);
                 Dictionary<String> mergedDict = newDicts.get(colRef);
 
-                BitmapCounter retValue = new BitmapCounter();
+                MutableBitmapCounter retValue = new MutableBitmapCounter();
                 byte[] literal = new byte[sourceDict.getSizeOfValue()];
-                Iterator<Integer> iterator = value.iterator();
-                while (iterator.hasNext()) {
-                    int id = iterator.next();
+                for (int id : value) {
                     int newId;
                     int size = sourceDict.getValueBytesFromId(id, literal, 0);
                     if (size < 0) {
@@ -141,7 +149,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
 
             @Override
             public void reset() {
-                current = new BitmapCounter();
+                current = new MutableBitmapCounter();
             }
         };
     }
@@ -174,8 +182,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
         return true;
     }
 
-    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(//
-            FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, //
+    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of(
+            FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class,
             FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class);
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 089d18c..8b15d1c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -18,54 +18,41 @@
 
 package org.apache.kylin.measure.bitmap;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
-/**
- * Created by sunyerui on 15/12/1.
- */
-public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
-    private ThreadLocal<BitmapCounter> current = new ThreadLocal<>();
+public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
+    private static final BitmapCounter DELEGATE = new ImmutableBitmapCounter();
 
+    // called by reflection
     public BitmapSerializer(DataType type) {
     }
 
     @Override
     public void serialize(BitmapCounter value, ByteBuffer out) {
         try {
-            value.writeRegisters(out);
+            value.serialize(out);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private BitmapCounter current() {
-        BitmapCounter counter = current.get();
-        if (counter == null) {
-            counter = new BitmapCounter();
-            current.set(counter);
-        }
-        return counter;
-    }
-
     @Override
     public BitmapCounter deserialize(ByteBuffer in) {
-        BitmapCounter counter = current();
+
         try {
-            counter.readRegisters(in);
+            return DELEGATE.deserialize(in);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return counter;
     }
 
     @Override
     public int peekLength(ByteBuffer in) {
-        return current().peekLength(in);
+        return DELEGATE.peekLength(in);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
new file mode 100644
index 0000000..5c39a4a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.bitmap;
+
+import org.apache.kylin.common.util.ByteBufferOutputStream;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * A thin wrapper around {@link ImmutableRoaringBitmap}.
+ */
+public class ImmutableBitmapCounter implements BitmapCounter {
+
+    protected ImmutableRoaringBitmap bitmap;
+
+    public ImmutableBitmapCounter() {
+        this(ImmutableRoaringBitmap.bitmapOf());
+    }
+
+    public ImmutableBitmapCounter(ImmutableRoaringBitmap bitmap) {
+        this.bitmap = bitmap;
+    }
+
+    @Override
+    public long getCount() {
+        return bitmap.getCardinality();
+    }
+
+    @Override
+    public int getMemBytes() {
+        return bitmap.getSizeInBytes();
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return bitmap.iterator();
+    }
+
+    @Override
+    public void serialize(ByteBuffer out) throws IOException {
+        if (out.remaining() < bitmap.serializedSizeInBytes()) {
+            throw new BufferOverflowException();
+        }
+        bitmap.serialize(new DataOutputStream(new ByteBufferOutputStream(out)));
+    }
+
+    @Override
+    public BitmapCounter deserialize(ByteBuffer in) throws IOException {
+        int size = peekLength(in);
+        // make a copy of the content to be safe
+        byte[] dst = new byte[size];
+        in.get(dst);
+
+        // just map the buffer, faster than deserialize
+        ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(dst));
+        return new ImmutableBitmapCounter(bitmap);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        // only look at the metadata of the bitmap, no deserialization happens
+        ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(in);
+        return bitmap.serializedSizeInBytes();
+    }
+
+    /**
+     * Copies the content of this counter to a counter that can be modified.
+     * @return a mutable counter
+     */
+    public MutableBitmapCounter toMutable() {
+        MutableBitmapCounter mutable = new MutableBitmapCounter();
+        mutable.orWith(this);
+        return mutable;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return (obj instanceof ImmutableBitmapCounter) &&
+                bitmap.equals(((ImmutableBitmapCounter) obj).bitmap);
+    }
+
+    @Override
+    public int hashCode() {
+        return bitmap.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "BitmapCounter[" + getCount() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
new file mode 100644
index 0000000..af01790
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.bitmap;
+
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A thin wrapper around {@link MutableRoaringBitmap}.
+ */
+public class MutableBitmapCounter extends ImmutableBitmapCounter {
+
+    public MutableBitmapCounter() {
+        super(new MutableRoaringBitmap());
+    }
+
+    private MutableRoaringBitmap getBitmap() {
+        return (MutableRoaringBitmap) bitmap;
+    }
+
+    public void clear() {
+        getBitmap().clear();
+    }
+
+    public void add(int value) {
+        getBitmap().add(value);
+    }
+
+    public void orWith(ImmutableBitmapCounter another) {
+        getBitmap().or(another.bitmap);
+    }
+
+    public void andWith(ImmutableBitmapCounter another) {
+        getBitmap().and(another.bitmap);
+    }
+
+    @Override
+    public void serialize(ByteBuffer out) throws IOException {
+        getBitmap().runOptimize();
+        super.serialize(out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
index 3d48ac2..39921c2 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
@@ -33,7 +33,7 @@ import org.apache.kylin.measure.basic.LongMaxAggregator;
 import org.apache.kylin.measure.basic.LongMinAggregator;
 import org.apache.kylin.measure.basic.LongSumAggregator;
 import org.apache.kylin.measure.bitmap.BitmapAggregator;
-import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.measure.hllc.HLLCAggregator;
 import org.apache.kylin.measure.hllc.HLLCounter;
@@ -104,7 +104,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase {
         hllcAggregator.aggregate(new HLLCounter(14));
 
         BitmapAggregator bitmapAggregator = new BitmapAggregator();
-        BitmapCounter bitmapCounter = new BitmapCounter();
+        MutableBitmapCounter bitmapCounter = new MutableBitmapCounter();
         for (int i = 4000; i <= 100000; i += 2) {
             bitmapCounter.add(i);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
index e216d0b..03eb53a 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
@@ -18,39 +18,29 @@
 
 package org.apache.kylin.measure.bitmap;
 
+import org.junit.Test;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-import org.junit.Test;
-
-/**
- * Created by sunyerui on 15/12/31.
- */
 public class BitmapAggregatorTest {
 
     @Test
     public void testAggregator() {
-        BitmapCounter counter = new BitmapCounter();
-        counter.add(1);
-        counter.add(3333);
-        counter.add("123");
-        counter.add(123);
-        assertEquals(3, counter.getCount());
-
-        BitmapCounter counter2 = new BitmapCounter();
-        counter2.add("23456");
-        counter2.add(12273456);
-        counter2.add("4258");
-        counter2.add(123);
-        assertEquals(4, counter2.getCount());
-
         BitmapAggregator aggregator = new BitmapAggregator();
-        assertNull(aggregator.getState());
-        assertEquals(Integer.MIN_VALUE, aggregator.getMemBytesEstimate());
+        assertNull(null, aggregator.getState());
+
+        aggregator.aggregate(new ImmutableBitmapCounter(
+                ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 40)
+        ));
+        assertEquals(4, aggregator.getState().getCount());
+
+        aggregator.aggregate(new ImmutableBitmapCounter(
+                ImmutableRoaringBitmap.bitmapOf(25, 30, 35, 40, 45)
+        ));
+        assertEquals(7, aggregator.getState().getCount());
 
-        aggregator.aggregate(counter);
-        aggregator.aggregate(counter2);
-        assertEquals(6, aggregator.getState().getCount());
         aggregator.reset();
         assertNull(aggregator.getState());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
index c9c1b51..70e4ecc 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
@@ -18,58 +18,66 @@
 
 package org.apache.kylin.measure.bitmap;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.junit.Test;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-/**
- * Created by sunyerui on 15/12/31.
- */
 public class BitmapCounterTest {
 
     @Test
-    public void testAddAndMergeValues() {
-        BitmapCounter counter = new BitmapCounter();
-        counter.add(1);
-        counter.add(3333);
-        counter.add("123");
-        counter.add(123);
-        assertEquals(3, counter.getCount());
+    public void testBitmapCounter() {
+        ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 1000);
+        ImmutableBitmapCounter counter = new ImmutableBitmapCounter(bitmap);
+        assertEquals(4, counter.getCount());
+        assertTrue(counter.getMemBytes() > 0);
 
-        BitmapCounter counter2 = new BitmapCounter();
-        counter2.add("23456");
-        counter2.add(12273456);
-        counter2.add("4258");
-        counter2.add(123);
-        counter2.add(-2147483648);
-        counter2.add(-2);
-        assertEquals(6, counter2.getCount());
+        MutableBitmapCounter counter2 = new MutableBitmapCounter();
+        assertEquals(0, counter2.getCount());
+        counter2.add(10);
+        counter2.add(30);
+        counter2.add(40);
+        counter2.add(2000);
+        assertEquals(4, counter2.getCount());
 
-        counter.merge(counter2);
-        assertEquals(8, counter.getCount());
-        System.out.print("counter size: " + counter.getMemBytes() + ", counter2 size: " + counter2.getMemBytes());
+        counter2.orWith(counter);
+        assertEquals(4, counter.getCount());
+        assertEquals(6, counter2.getCount());  // in-place change
+
+        int i = 0;
+        int[] values = new int[(int) counter2.getCount()];
+        for (int value : counter2) {
+            values[i++] = value;
+        }
+        assertArrayEquals(new int[]{10, 20, 30, 40, 1000, 2000}, values);
+
+        counter2.clear();
+        assertEquals(0, counter2.getCount());
     }
 
     @Test
-    public void testSerDeCounter() throws IOException {
-        BitmapCounter counter = new BitmapCounter();
-        for (int i = 1; i < 1000; i++) {
-            counter.add(i);
-        }
-        ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024);
-        counter.writeRegisters(buffer);
-        int len = buffer.position();
+    public void testToMutableBitmapCounter() {
+        ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(10, 20, 30, 1000);
+        ImmutableBitmapCounter immutable = new ImmutableBitmapCounter(bitmap);
+        MutableBitmapCounter mutable = new MutableBitmapCounter();
+        mutable.orWith(immutable);
+
+        assertEquals(4, immutable.getCount());
+        assertEquals(4, mutable.getCount());
+        assertTrue(immutable.equals(mutable));
+        assertTrue(mutable.equals(immutable));
 
-        buffer.position(0);
-        assertEquals(len, counter.peekLength(buffer));
-        assertEquals(0, buffer.position());
+        MutableBitmapCounter newCounter = immutable.toMutable();
+        newCounter.add(40);
+        assertEquals(4, immutable.getCount());
+        assertEquals(5, newCounter.getCount());
 
-        BitmapCounter counter2 = new BitmapCounter();
-        counter2.readRegisters(buffer);
-        assertEquals(999, counter2.getCount());
+        newCounter = mutable.toMutable();
+        newCounter.add(40);
+        assertEquals(4, mutable.getCount());
+        assertEquals(5, newCounter.getCount());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d19533c4/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
index 41efb2c..71fcae6 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
@@ -18,19 +18,20 @@
 
 package org.apache.kylin.measure.bitmap;
 
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-/**
- * Created by sunyerui on 15/12/31.
- */
 public class BitmapSerializerTest extends LocalFileMetadataTestCase {
     @BeforeClass
     public static void setUp() throws Exception {
@@ -43,27 +44,36 @@ public class BitmapSerializerTest extends LocalFileMetadataTestCase {
     }
 
     @Test
-    public void testSerDeCounter() {
-        BitmapCounter counter = new BitmapCounter();
-        counter.add(1);
-        counter.add(3333);
-        counter.add("123");
-        counter.add(123);
-        assertEquals(3, counter.getCount());
-
-        ByteBuffer buffer = ByteBuffer.allocate(10 * 1024 * 1024);
+    public void testBitmapSerDe() {
         BitmapSerializer serializer = new BitmapSerializer(DataType.ANY);
+
+        ImmutableRoaringBitmap bitmap = ImmutableRoaringBitmap.bitmapOf(1, 1234, 5678, 100000);
+        ImmutableBitmapCounter counter = new ImmutableBitmapCounter(bitmap);
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
         serializer.serialize(counter, buffer);
-        int len = buffer.position();
+        int size = buffer.position();
+        buffer.flip();
+
+        assertEquals(size, serializer.peekLength(buffer));
+        assertEquals(0, buffer.position()); // peek doesn't change buffer
+
+        BitmapCounter counter2 = serializer.deserialize(buffer);
+        assertEquals(size, buffer.position()); // deserialize advance positions to next record
+        assertEquals(4, counter2.getCount());
 
-        buffer.position(0);
-        BitmapSerializer deSerializer = new BitmapSerializer(DataType.ANY);
-        BitmapCounter counter2 = deSerializer.deserialize(buffer);
-        assertEquals(3, counter2.getCount());
+        buffer.flip();
+        for (int i = 0; i < size; i++) {
+            buffer.put((byte) 0); // clear buffer content
+        }
+        assertEquals(4, counter2.getCount());
 
-        buffer.position(0);
-        assertEquals(len, deSerializer.peekLength(buffer));
-        assertEquals(8 * 1024 * 1024, deSerializer.maxLength());
-        System.out.println("counter size " + deSerializer.getStorageBytesEstimate());
+        buffer = ByteBuffer.allocate(size - 1);
+        try {
+            serializer.serialize(counter, buffer);
+            Assert.fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof BufferOverflowException);
+        }
     }
 }
\ No newline at end of file