You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:43:36 UTC

[07/47] kylin git commit: KYLIN-2387 code refactor, merge ImmutableBitmapCounter and MutableBitmapCounter into RoaringBitmapCounter

KYLIN-2387 code refactor, merge ImmutableBitmapCounter and MutableBitmapCounter into RoaringBitmapCounter


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/38c3e7bf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/38c3e7bf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/38c3e7bf

Branch: refs/heads/KYLIN-2361
Commit: 38c3e7bf691ecdfd0f8d42fcc97065a0596be018
Parents: 61833d9
Author: gaodayue <ga...@meituan.com>
Authored: Thu Jan 19 15:23:34 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Thu Jan 19 15:43:14 2017 +0800

----------------------------------------------------------------------
 .../gridtable/AggregationCacheMemSizeTest.java  |  11 +-
 .../metadata/measure/MeasureCodecTest.java      |   5 +-
 .../kylin/measure/bitmap/BitmapAggregator.java  |  25 +---
 .../kylin/measure/bitmap/BitmapCounter.java     |  28 +++-
 .../measure/bitmap/BitmapCounterFactory.java    |  30 ++++
 .../BitmapIntersectDistinctCountAggFunc.java    |  16 +-
 .../kylin/measure/bitmap/BitmapMeasureType.java |  17 ++-
 .../kylin/measure/bitmap/BitmapSerializer.java  |   9 +-
 .../measure/bitmap/ImmutableBitmapCounter.java  | 108 --------------
 .../measure/bitmap/MutableBitmapCounter.java    |  60 --------
 .../measure/bitmap/RoaringBitmapCounter.java    | 147 +++++++++++++++++++
 .../bitmap/RoaringBitmapCounterFactory.java     |  47 ++++++
 .../measure/AggregatorMemEstimateTest.java      |   5 +-
 .../measure/bitmap/BitmapAggregatorTest.java    |  56 ++++++-
 .../kylin/measure/bitmap/BitmapCounterTest.java |  32 +---
 .../measure/bitmap/BitmapSerializerTest.java    |   5 +-
 16 files changed, 345 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index 63c7672..f749fb4 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -33,7 +33,9 @@ import org.apache.kylin.measure.basic.BigDecimalSumAggregator;
 import org.apache.kylin.measure.basic.DoubleSumAggregator;
 import org.apache.kylin.measure.basic.LongSumAggregator;
 import org.apache.kylin.measure.bitmap.BitmapAggregator;
-import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
+import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.BitmapCounterFactory;
+import org.apache.kylin.measure.bitmap.RoaringBitmapCounterFactory;
 import org.apache.kylin.measure.hllc.HLLCAggregator;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.github.jamm.MemoryMeter;
@@ -43,13 +45,14 @@ import com.google.common.base.Stopwatch;
 
 public class AggregationCacheMemSizeTest {
     private static final MemoryMeter meter = new MemoryMeter();
-    private static final MutableBitmapCounter[] bitmaps = new MutableBitmapCounter[5];
+    private static final BitmapCounterFactory bitmapFactory = RoaringBitmapCounterFactory.INSTANCE;
+    private static final BitmapCounter[] bitmaps = new BitmapCounter[5];
     private static final Random random = new Random();
 
     // consider bitmaps with variant cardinality
     static {
         for (int i = 0; i < bitmaps.length; i++) {
-            bitmaps[i] = new MutableBitmapCounter();
+            bitmaps[i] = bitmapFactory.newBitmap();
         }
 
         final int totalBits = 1_000_000;
@@ -116,7 +119,7 @@ public class AggregationCacheMemSizeTest {
     }
 
     private BitmapAggregator createBitmapAggr(boolean lowCardinality) {
-        MutableBitmapCounter counter = new MutableBitmapCounter();
+        BitmapCounter counter = bitmapFactory.newBitmap();
         counter.orWith(lowCardinality ? bitmaps[0] : bitmaps[3]);
 
         BitmapAggregator result = new BitmapAggregator();

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 97c9751..7129a5e 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -25,7 +25,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
+import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.RoaringBitmapCounterFactory;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -58,7 +59,7 @@ public class MeasureCodecTest extends LocalFileMetadataTestCase {
         HLLCounter hllc = new HLLCounter(16);
         hllc.add("1234567");
         hllc.add("abcdefg");
-        MutableBitmapCounter bitmap = new MutableBitmapCounter();
+        BitmapCounter bitmap = RoaringBitmapCounterFactory.INSTANCE.newBitmap();
         bitmap.add(123);
         bitmap.add(45678);
         bitmap.add(Integer.MAX_VALUE - 10);

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index 2c91bfa..d57af48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -21,47 +21,36 @@ package org.apache.kylin.measure.bitmap;
 import org.apache.kylin.measure.MeasureAggregator;
 
 public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
+    private static final BitmapCounterFactory bitmapFactory = RoaringBitmapCounterFactory.INSTANCE;
 
-    private ImmutableBitmapCounter sum;
-    private boolean isMutable;
+    private BitmapCounter sum;
 
     @Override
     public void reset() {
         sum = null;
-        isMutable = false;
     }
 
     @Override
     public void aggregate(BitmapCounter value) {
-        ImmutableBitmapCounter v = (ImmutableBitmapCounter) value;
-
         // Here we optimize for case when group only has 1 value. In such situation, no
         // aggregation is needed, so we just keep a reference to the first value, saving
         // the cost of deserialization and merging.
         if (sum == null) {
-            sum = v;
+            sum = value;
             return;
         }
 
-        MutableBitmapCounter mutable;
-        if (!isMutable) {   // when aggregate the second value
-            mutable = sum.toMutable();
-            sum = mutable;
-            isMutable = true;
-        } else {    // for the third, forth, ...
-            mutable = (MutableBitmapCounter) sum;
-        }
-        mutable.orWith(v);
+        sum.orWith(value);
     }
 
     @Override
     public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
-        MutableBitmapCounter merged = new MutableBitmapCounter();
+        BitmapCounter merged = bitmapFactory.newBitmap();
         if (value1 != null) {
-            merged.orWith((ImmutableBitmapCounter) value1);
+            merged.orWith(value1);
         }
         if (value2 != null) {
-            merged.orWith((ImmutableBitmapCounter) value2);
+            merged.orWith(value2);
         }
         return merged;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index f07059c..0854b6d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -26,6 +26,30 @@ import java.util.Iterator;
  * An implementation-agnostic bitmap type.
  */
 public interface BitmapCounter extends Iterable<Integer> {
+
+    /**
+     * Add the value to the bitmap (set the value to "true"), whether it already appears or not.
+     * @param value integer value
+     */
+    void add(int value);
+
+    /**
+     * In-place bitwise OR (union) operation. The current bitmap is modified.
+     * @param another other bitmap
+     */
+    void orWith(BitmapCounter another);
+
+    /**
+     * In-place bitwise AND (intersection) operation. The current bitmap is modified.
+     * @param another other bitmap
+     */
+    void andWith(BitmapCounter another);
+
+    /**
+     * reset to an empty bitmap
+     */
+    void clear();
+
     /**
      * @return cardinality of the bitmap
      */
@@ -44,13 +68,13 @@ public interface BitmapCounter extends Iterable<Integer> {
     /**
      * Serialize this counter. The current counter is not modified.
      */
-    void serialize(ByteBuffer out) throws IOException;
+    void write(ByteBuffer out) throws IOException;
 
     /**
      * Deserialize a counter from its serialized form.
      * <p> After deserialize, any changes to `in` should not affect the returned counter.
      */
-    BitmapCounter deserialize(ByteBuffer in) throws IOException;
+    void readFields(ByteBuffer in) throws IOException;
 
     /**
      * @return size of the counter stored in the current position of `in`.

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
new file mode 100644
index 0000000..da7748e
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.bitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface BitmapCounterFactory {
+    BitmapCounter newBitmap();
+
+    BitmapCounter newBitmap(int... values);
+
+    BitmapCounter newBitmap(ByteBuffer in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index dcdf945..cd4d306 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -28,9 +28,10 @@ import java.util.Map;
  *          requires an bitmap count distinct measure of uuid, and an dimension of event
  */
 public class BitmapIntersectDistinctCountAggFunc {
+    private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
 
     public static class RetentionPartialResult {
-        Map<Object, MutableBitmapCounter> map;
+        Map<Object, BitmapCounter> map;
         List keyList;
 
         public RetentionPartialResult() {
@@ -42,12 +43,11 @@ public class BitmapIntersectDistinctCountAggFunc {
                 this.keyList = keyList;
             }
             if (this.keyList != null && this.keyList.contains(key)) {
-                MutableBitmapCounter counter = map.get(key);
+                BitmapCounter counter = map.get(key);
                 if (counter == null) {
-                    counter = new MutableBitmapCounter();
-                    map.put(key, counter);
+                    map.put(key, counter = factory.newBitmap());
                 }
-                counter.orWith((ImmutableBitmapCounter) value);
+                counter.orWith((BitmapCounter) value);
             }
         }
 
@@ -61,11 +61,11 @@ public class BitmapIntersectDistinctCountAggFunc {
                     return 0;
                 }
             }
-            MutableBitmapCounter counter = null;
+            BitmapCounter counter = null;
             for (Object key : keyList) {
-                MutableBitmapCounter c = map.get(key);
+                BitmapCounter c = map.get(key);
                 if (counter == null) {
-                    counter = new MutableBitmapCounter();
+                    counter = factory.newBitmap();
                     counter.orWith(c);
                 } else {
                     counter.andWith(c);

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index de2a34a..e4fb079 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -96,18 +96,19 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
 
     @Override
     public MeasureIngester<BitmapCounter> newIngester() {
+        final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
+
         return new MeasureIngester<BitmapCounter>() {
-            MutableBitmapCounter current = new MutableBitmapCounter();
+            BitmapCounter current = factory.newBitmap();
 
             @Override
             public BitmapCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
                 checkArgument(values.length == 1, "expect 1 value, got %s", Arrays.toString(values));
 
-                MutableBitmapCounter bitmap = current;
-                bitmap.clear();
+                current.clear();
 
                 if (values[0] == null) {
-                    return bitmap;
+                    return current;
                 }
 
                 int id;
@@ -119,8 +120,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
                     id = Integer.parseInt(values[0]);
                 }
 
-                bitmap.add(id);
-                return bitmap;
+                current.add(id);
+                return current;
             }
 
             @Override
@@ -132,7 +133,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
                 Dictionary<String> sourceDict = oldDicts.get(colRef);
                 Dictionary<String> mergedDict = newDicts.get(colRef);
 
-                MutableBitmapCounter retValue = new MutableBitmapCounter();
+                BitmapCounter retValue = factory.newBitmap();
                 for (int id : value) {
                     int newId;
                     String v = sourceDict.getValueFromId(id);
@@ -148,7 +149,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
 
             @Override
             public void reset() {
-                current = new MutableBitmapCounter();
+                current = factory.newBitmap();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 0e970de..c1b260d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -25,7 +25,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
-    private static final BitmapCounter DELEGATE = new MutableBitmapCounter();
+    private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
+    private static final BitmapCounter DELEGATE = factory.newBitmap();
 
     // called by reflection
     public BitmapSerializer(DataType type) {
@@ -34,7 +35,7 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
     @Override
     public void serialize(BitmapCounter value, ByteBuffer out) {
         try {
-            value.serialize(out);
+            value.write(out);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -42,9 +43,9 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
 
     @Override
     public BitmapCounter deserialize(ByteBuffer in) {
-
         try {
-            return DELEGATE.deserialize(in);
+            return factory.newBitmap(in);
+
         } catch (IOException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
deleted file mode 100644
index 753f089..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/ImmutableBitmapCounter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.measure.bitmap;
-
-import org.apache.kylin.common.util.ByteBufferOutputStream;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-/**
- * A thin wrapper around {@link ImmutableRoaringBitmap}.
- */
-public class ImmutableBitmapCounter implements BitmapCounter {
-
-    protected ImmutableRoaringBitmap bitmap;
-
-    public ImmutableBitmapCounter(ImmutableRoaringBitmap bitmap) {
-        this.bitmap = bitmap;
-    }
-
-    @Override
-    public long getCount() {
-        return bitmap.getCardinality();
-    }
-
-    @Override
-    public int getMemBytes() {
-        return bitmap.getSizeInBytes();
-    }
-
-    @Override
-    public Iterator<Integer> iterator() {
-        return bitmap.iterator();
-    }
-
-    @Override
-    public void serialize(ByteBuffer out) throws IOException {
-        if (out.remaining() < bitmap.serializedSizeInBytes()) {
-            throw new BufferOverflowException();
-        }
-        bitmap.serialize(new DataOutputStream(new ByteBufferOutputStream(out)));
-    }
-
-    @Override
-    public BitmapCounter deserialize(ByteBuffer in) throws IOException {
-        int size = peekLength(in);
-        // make a copy of the content to be safe
-        byte[] dst = new byte[size];
-        in.get(dst);
-
-        // just map the buffer, faster than deserialize
-        ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(dst));
-        return new ImmutableBitmapCounter(bitmap);
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        // only look at the metadata of the bitmap, no deserialization happens
-        ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(in);
-        return bitmap.serializedSizeInBytes();
-    }
-
-    /**
-     * Copies the content of this counter to a counter that can be modified.
-     * @return a mutable counter
-     */
-    public MutableBitmapCounter toMutable() {
-        MutableBitmapCounter mutable = new MutableBitmapCounter();
-        mutable.orWith(this);
-        return mutable;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        return (obj instanceof ImmutableBitmapCounter) &&
-                bitmap.equals(((ImmutableBitmapCounter) obj).bitmap);
-    }
-
-    @Override
-    public int hashCode() {
-        return bitmap.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return "BitmapCounter[" + getCount() + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
deleted file mode 100644
index af01790..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/MutableBitmapCounter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.measure.bitmap;
-
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * A thin wrapper around {@link MutableRoaringBitmap}.
- */
-public class MutableBitmapCounter extends ImmutableBitmapCounter {
-
-    public MutableBitmapCounter() {
-        super(new MutableRoaringBitmap());
-    }
-
-    private MutableRoaringBitmap getBitmap() {
-        return (MutableRoaringBitmap) bitmap;
-    }
-
-    public void clear() {
-        getBitmap().clear();
-    }
-
-    public void add(int value) {
-        getBitmap().add(value);
-    }
-
-    public void orWith(ImmutableBitmapCounter another) {
-        getBitmap().or(another.bitmap);
-    }
-
-    public void andWith(ImmutableBitmapCounter another) {
-        getBitmap().and(another.bitmap);
-    }
-
-    @Override
-    public void serialize(ByteBuffer out) throws IOException {
-        getBitmap().runOptimize();
-        super.serialize(out);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
new file mode 100644
index 0000000..cd07d20
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.bitmap;
+
+import org.apache.kylin.common.util.ByteBufferOutputStream;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * A {@link BitmapCounter} based on roaring bitmap.
+ */
+public class RoaringBitmapCounter implements BitmapCounter {
+
+    private ImmutableRoaringBitmap bitmap;
+
+    RoaringBitmapCounter() {
+        bitmap = new MutableRoaringBitmap();
+    }
+
+    RoaringBitmapCounter(ImmutableRoaringBitmap bitmap) {
+        this.bitmap = bitmap;
+    }
+
+    private MutableRoaringBitmap getMutableBitmap() {
+        if (bitmap instanceof MutableRoaringBitmap) {
+            return (MutableRoaringBitmap) bitmap;
+        }
+        // convert to mutable bitmap
+        MutableRoaringBitmap result = bitmap.toMutableRoaringBitmap();
+        bitmap = result;
+        return result;
+    }
+
+    @Override
+    public void add(int value) {
+        getMutableBitmap().add(value);
+    }
+
+    @Override
+    public void orWith(BitmapCounter another) {
+        if (another instanceof RoaringBitmapCounter) {
+            RoaringBitmapCounter input = (RoaringBitmapCounter) another;
+            getMutableBitmap().or(input.bitmap);
+            return;
+        }
+        throw new IllegalArgumentException("Unsupported type: " + another.getClass().getCanonicalName());
+    }
+
+    @Override
+    public void andWith(BitmapCounter another) {
+        if (another instanceof RoaringBitmapCounter) {
+            RoaringBitmapCounter input = (RoaringBitmapCounter) another;
+            getMutableBitmap().and(input.bitmap);
+            return;
+        }
+        throw new IllegalArgumentException("Unsupported type: " + another.getClass().getCanonicalName());
+    }
+
+    @Override
+    public void clear() {
+        bitmap = new MutableRoaringBitmap();
+    }
+
+    @Override
+    public long getCount() {
+        return bitmap.getCardinality();
+    }
+
+    @Override
+    public int getMemBytes() {
+        return bitmap.getSizeInBytes();
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return bitmap.iterator();
+    }
+
+    @Override
+    public void write(ByteBuffer out) throws IOException {
+        if (bitmap instanceof MutableRoaringBitmap) {
+            getMutableBitmap().runOptimize();
+        }
+
+        if (out.remaining() < bitmap.serializedSizeInBytes()) {
+            throw new BufferOverflowException();
+        }
+        bitmap.serialize(new DataOutputStream(new ByteBufferOutputStream(out)));
+    }
+
+    @Override
+    public void readFields(ByteBuffer in) throws IOException {
+        int size = peekLength(in);
+        // make a copy of the content to be safe
+        byte[] dst = new byte[size];
+        in.get(dst);
+
+        // ImmutableRoaringBitmap only maps the buffer, thus faster than constructing a MutableRoaringBitmap.
+        // we'll convert to MutableRoaringBitmap later when mutate is needed
+        bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(dst));
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        // only look at the metadata of the bitmap, no deserialization happens
+        ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(in);
+        return bitmap.serializedSizeInBytes();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return (obj instanceof RoaringBitmapCounter) &&
+                bitmap.equals(((RoaringBitmapCounter) obj).bitmap);
+    }
+
+    @Override
+    public int hashCode() {
+        return bitmap.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "RoaringBitmapCounter[" + getCount() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
new file mode 100644
index 0000000..a71df95
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.bitmap;
+
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class RoaringBitmapCounterFactory implements BitmapCounterFactory {
+    public static final BitmapCounterFactory INSTANCE = new RoaringBitmapCounterFactory();
+
+    private RoaringBitmapCounterFactory() {}
+
+    @Override
+    public BitmapCounter newBitmap() {
+        return new RoaringBitmapCounter();
+    }
+
+    @Override
+    public BitmapCounter newBitmap(int... values) {
+        return new RoaringBitmapCounter(MutableRoaringBitmap.bitmapOf(values));
+    }
+
+    @Override
+    public BitmapCounter newBitmap(ByteBuffer in) throws IOException {
+        RoaringBitmapCounter counter = new RoaringBitmapCounter();
+        counter.readFields(in);
+        return counter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
index 39921c2..4e67d22 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
@@ -33,7 +33,8 @@ import org.apache.kylin.measure.basic.LongMaxAggregator;
 import org.apache.kylin.measure.basic.LongMinAggregator;
 import org.apache.kylin.measure.basic.LongSumAggregator;
 import org.apache.kylin.measure.bitmap.BitmapAggregator;
-import org.apache.kylin.measure.bitmap.MutableBitmapCounter;
+import org.apache.kylin.measure.bitmap.BitmapCounter;
+import org.apache.kylin.measure.bitmap.RoaringBitmapCounterFactory;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.measure.hllc.HLLCAggregator;
 import org.apache.kylin.measure.hllc.HLLCounter;
@@ -104,7 +105,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase {
         hllcAggregator.aggregate(new HLLCounter(14));
 
         BitmapAggregator bitmapAggregator = new BitmapAggregator();
-        MutableBitmapCounter bitmapCounter = new MutableBitmapCounter();
+        BitmapCounter bitmapCounter = RoaringBitmapCounterFactory.INSTANCE.newBitmap();
         for (int i = 4000; i <= 100000; i += 2) {
             bitmapCounter.add(i);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
index a52e427..0b82fc4 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapAggregatorTest.java
@@ -19,30 +19,72 @@
 package org.apache.kylin.measure.bitmap;
 
 import org.junit.Test;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class BitmapAggregatorTest {
+    private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
 
     @Test
     public void testAggregator() {
         BitmapAggregator aggregator = new BitmapAggregator();
         assertNull(null, aggregator.getState());
 
-        aggregator.aggregate(new ImmutableBitmapCounter(
-                MutableRoaringBitmap.bitmapOf(10, 20, 30, 40)
-        ));
+        aggregator.aggregate(factory.newBitmap(10, 20, 30, 40));
         assertEquals(4, aggregator.getState().getCount());
 
-        aggregator.aggregate(new ImmutableBitmapCounter(
-                MutableRoaringBitmap.bitmapOf(25, 30, 35, 40, 45)
-        ));
+        aggregator.aggregate(factory.newBitmap(25, 30, 35, 40, 45));
         assertEquals(7, aggregator.getState().getCount());
 
         aggregator.reset();
         assertNull(aggregator.getState());
     }
 
+    @Test
+    public void testAggregatorDeserializedCounter() throws IOException {
+        BitmapCounter counter1 = factory.newBitmap(1, 3, 5);
+        BitmapCounter counter2 = factory.newBitmap(1, 2, 4, 6);
+        BitmapCounter counter3 = factory.newBitmap(1, 5, 7);
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+        counter1.write(buffer);
+        counter2.write(buffer);
+        counter3.write(buffer);
+        buffer.flip();
+
+        BitmapAggregator aggregator = new BitmapAggregator();
+
+        // first
+        BitmapCounter next = factory.newBitmap(buffer);
+        assertEquals(counter1, next);
+
+        aggregator.aggregate(next);
+        assertEquals(counter1, aggregator.getState());
+
+        // second
+        next = factory.newBitmap(buffer);
+        assertEquals(counter2, next);
+
+        aggregator.aggregate(next);
+        assertEquals(6, aggregator.getState().getCount());
+
+        // third
+        next = factory.newBitmap(buffer);
+        assertEquals(counter3, next);
+
+        aggregator.aggregate(next);
+        assertEquals(7, aggregator.getState().getCount());
+
+        BitmapCounter result = factory.newBitmap();
+        result.orWith(counter1);
+        result.orWith(counter2);
+        result.orWith(counter3);
+        assertEquals(result, aggregator.getState());
+
+
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
index ee7733f..7194a23 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java
@@ -19,23 +19,21 @@
 package org.apache.kylin.measure.bitmap;
 
 import org.junit.Test;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class BitmapCounterTest {
+    private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
 
     @Test
     public void testBitmapCounter() {
-        ImmutableBitmapCounter counter = new ImmutableBitmapCounter(
-                MutableRoaringBitmap.bitmapOf(10, 20, 30, 1000)
-        );
+        BitmapCounter counter = factory.newBitmap(10, 20, 30, 1000);
         assertEquals(4, counter.getCount());
         assertTrue(counter.getMemBytes() > 0);
 
-        MutableBitmapCounter counter2 = new MutableBitmapCounter();
+        BitmapCounter counter2 = factory.newBitmap();
         assertEquals(0, counter2.getCount());
         counter2.add(10);
         counter2.add(30);
@@ -58,28 +56,4 @@ public class BitmapCounterTest {
         assertEquals(0, counter2.getCount());
     }
 
-    @Test
-    public void testToMutableBitmapCounter() {
-        ImmutableBitmapCounter immutable = new ImmutableBitmapCounter(
-                MutableRoaringBitmap.bitmapOf(10, 20, 30, 1000)
-        );
-        MutableBitmapCounter mutable = new MutableBitmapCounter();
-        mutable.orWith(immutable);
-
-        assertEquals(4, immutable.getCount());
-        assertEquals(4, mutable.getCount());
-        assertTrue(immutable.equals(mutable));
-        assertTrue(mutable.equals(immutable));
-
-        MutableBitmapCounter newCounter = immutable.toMutable();
-        newCounter.add(40);
-        assertEquals(4, immutable.getCount());
-        assertEquals(5, newCounter.getCount());
-
-        newCounter = mutable.toMutable();
-        newCounter.add(40);
-        assertEquals(4, mutable.getCount());
-        assertEquals(5, newCounter.getCount());
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/38c3e7bf/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
index e5b828d..acbfe88 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapSerializerTest.java
@@ -24,7 +24,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
@@ -47,9 +46,7 @@ public class BitmapSerializerTest extends LocalFileMetadataTestCase {
     public void testBitmapSerDe() {
         BitmapSerializer serializer = new BitmapSerializer(DataType.ANY);
 
-        ImmutableBitmapCounter counter = new ImmutableBitmapCounter(
-                MutableRoaringBitmap.bitmapOf(1, 1234, 5678, 100000)
-        );
+        BitmapCounter counter = RoaringBitmapCounterFactory.INSTANCE.newBitmap(1, 1234, 5678, 100000);
 
         ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
         serializer.serialize(counter, buffer);