You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/21 04:16:09 UTC

[03/13] incubator-kylin git commit: KYLIN-943 initial commit for function and performance test

KYLIN-943 initial commit for function and performance test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d37a363f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d37a363f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d37a363f

Branch: refs/heads/KYLIN-943
Commit: d37a363f4b6d8414d1f1214e8cc62b11c6e0737a
Parents: 35c57b2
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 10:05:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 21 10:11:53 2015 +0800

----------------------------------------------------------------------
 core-common/pom.xml                             |   4 +
 .../org/apache/kylin/common/topn/Counter.java   |  85 +++++
 .../kylin/common/topn/DoublyLinkedList.java     | 188 +++++++++++
 .../org/apache/kylin/common/topn/ITopK.java     |  53 +++
 .../org/apache/kylin/common/topn/ListNode2.java |  51 +++
 .../apache/kylin/common/topn/TopNCounter.java   | 336 +++++++++++++++++++
 .../common/topn/TopNCounterComparisonTest.java  | 281 ++++++++++++++++
 .../kylin/common/topn/TopNCounterTest.java      | 174 ++++++++++
 pom.xml                                         |   6 +
 9 files changed, 1178 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 577db42..f9e715d 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -65,6 +65,10 @@
             <artifactId>commons-email</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-httpclient</groupId>
             <artifactId>commons-httpclient</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
new file mode 100644
index 0000000..0f7d8de
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Modified from Counter.java in https://github.com/addthis/stream-lib
+ * 
+ * @param <T>
+ */
+public class Counter<T> implements Externalizable {
+
+    protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
+
+    protected T item;
+    protected double count;
+    protected double error;
+
+    /**
+     * For de-serialization
+     */
+    public Counter() {
+    }
+
+    public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
+        this.bucketNode = bucket;
+        this.count = 0;
+        this.error = 0;
+        this.item = item;
+    }
+
+    public T getItem() {
+        return item;
+    }
+
+    public double getCount() {
+        return count;
+    }
+
+    public double getError() {
+        return error;
+    }
+
+    @Override
+    public String toString() {
+        return item + ":" + count + ':' + error;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        item = (T) in.readObject();
+        count = in.readDouble();
+        error = in.readDouble();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(item);
+        out.writeDouble(count);
+        out.writeDouble(error);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
new file mode 100644
index 0000000..0942b84
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
+ * 
+ * @param <T>
+ */
+public class DoublyLinkedList<T> implements Iterable<T> {
+
+    protected int size;
+    protected ListNode2<T> tail;
+    protected ListNode2<T> head;
+
+    /**
+     * Append to head of list
+     */
+    public ListNode2<T> add(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+        if (size++ == 0) {
+            tail = node;
+        } else {
+            node.prev = head;
+            head.next = node;
+        }
+
+        head = node;
+
+        return node;
+    }
+
+    /**
+     * Prepend to tail of list
+     */
+    public ListNode2<T> enqueue(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+        if (size++ == 0) {
+            head = node;
+        } else {
+            node.next = tail;
+            tail.prev = node;
+        }
+
+        tail = node;
+
+        return node;
+    }
+
+    public void add(ListNode2<T> node) {
+        node.prev = head;
+        node.next = null;
+
+        if (size++ == 0) {
+            tail = node;
+        } else {
+            head.next = node;
+        }
+
+        head = node;
+    }
+
+    public ListNode2<T> addAfter(ListNode2<T> node, T value) {
+        ListNode2<T> newNode = new ListNode2<T>(value);
+        addAfter(node, newNode);
+        return newNode;
+    }
+
+    public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.next = node.next;
+        newNode.prev = node;
+        node.next = newNode;
+        if (newNode.next == null) {
+            head = newNode;
+        } else {
+            newNode.next.prev = newNode;
+        }
+        size++;
+    }
+
+    public void remove(ListNode2<T> node) {
+        if (node == tail) {
+            tail = node.next;
+        } else {
+            node.prev.next = node.next;
+        }
+
+        if (node == head) {
+            head = node.prev;
+        } else {
+            node.next.prev = node.prev;
+        }
+        size--;
+    }
+
+    public int size() {
+        return size;
+    }
+
+
+    @Override
+    public Iterator<T> iterator() {
+        return new DoublyLinkedListIterator(this);
+    }
+
+    protected class DoublyLinkedListIterator implements Iterator<T> {
+
+        protected DoublyLinkedList<T> list;
+        protected ListNode2<T> itr;
+        protected int length;
+
+        public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
+            this.length = list.size;
+            this.list = list;
+            this.itr = list.tail;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return itr != null;
+        }
+
+        @Override
+        public T next() {
+            if (length != list.size) {
+                throw new ConcurrentModificationException();
+            }
+            T next = itr.value;
+            itr = itr.next;
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    public T first() {
+        return tail == null ? null : tail.getValue();
+    }
+
+    public T last() {
+        return head == null ? null : head.getValue();
+    }
+
+    public ListNode2<T> head() {
+        return head;
+    }
+
+    public ListNode2<T> tail() {
+        return tail;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    public T[] toArray() {
+        T[] a = (T[]) new Object[size];
+        int i = 0;
+        for (T v : this) {
+            a[i++] = v;
+        }
+        return a;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
new file mode 100644
index 0000000..36603b7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.List;
+
+/**
+ * Modified from https://github.com/addthis/stream-lib
+ *  
+ * @param <T>
+ */
+public interface ITopK<T> {
+
+    /**
+     * offer a single element to the top.
+     *
+     * @param element - the element to add to the top
+     * @return false if the element was already in the top
+     */
+    boolean offer(T element);
+
+    /**
+     * offer a single element to the top and increment the count
+     * for that element by incrementCount.
+     *
+     * @param element        - the element to add to the top
+     * @param incrementCount - the increment count for the given count
+     * @return false if the element was already in the top
+     */
+    boolean offer(T element, double incrementCount);
+
+    /**
+     * @param k
+     * @return top k elements offered (may be an approximation)
+     */
+    List<T> peek(int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
new file mode 100644
index 0000000..92f5f57
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+/**
+ * Modified from ListNode2.java in https://github.com/addthis/stream-lib
+ *  
+ * @param <T>
+ */
+public class ListNode2<T> {
+
+    protected T value;
+    protected ListNode2<T> prev;
+    protected ListNode2<T> next;
+
+    public ListNode2(T value) {
+        this.value = value;
+    }
+
+    public ListNode2<T> getPrev() {
+        return prev;
+    }
+
+    public ListNode2<T> getNext() {
+        return next;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
new file mode 100644
index 0000000..0a45d0b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
+ * 
+ * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
+ * data structure as described in:
+ * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
+ * by Metwally, Agrawal, and Abbadi
+ *
+ * @param <T> type of data in the stream to be summarized
+ */
+public class TopNCounter<T> implements ITopK<T>, Externalizable {
+
+    protected class Bucket {
+
+        protected DoublyLinkedList<Counter<T>> counterList;
+
+        private double count;
+
+        public Bucket(double count) {
+            this.count = count;
+            this.counterList = new DoublyLinkedList<Counter<T>>();
+        }
+    }
+
+    protected int capacity;
+    private HashMap<T, ListNode2<Counter<T>>> counterMap;
+    protected DoublyLinkedList<Bucket> bucketList;
+
+    /**
+     * @param capacity maximum size (larger capacities improve accuracy)
+     */
+    public TopNCounter(int capacity) {
+        this.capacity = capacity;
+        counterMap = new HashMap<T, ListNode2<Counter<T>>>();
+        bucketList = new DoublyLinkedList<Bucket>();
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    @Override
+    public boolean offer(T item) {
+        return offer(item, 1.0);
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    @Override
+    public boolean offer(T item, double incrementCount) {
+        return offerReturnAll(item, incrementCount).getFirst();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return item dropped from summary if an item was dropped, null otherwise
+     */
+    public T offerReturnDropped(T item, int incrementCount) {
+        return offerReturnAll(item, incrementCount).getSecond();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
+     */
+    public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
+        ListNode2<Counter<T>> counterNode = counterMap.get(item);
+        boolean isNewItem = (counterNode == null);
+        T droppedItem = null;
+        if (isNewItem) {
+
+            if (size() < capacity) {
+                counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+            } else {
+                Bucket min = bucketList.first();
+                counterNode = min.counterList.tail();
+                Counter<T> counter = counterNode.getValue();
+                droppedItem = counter.item;
+                counterMap.remove(droppedItem);
+                counter.item = item;
+                counter.error = min.count;
+            }
+            counterMap.put(item, counterNode);
+        }
+
+        incrementCounter(counterNode, incrementCount);
+
+        return new Pair<Boolean, T>(isNewItem, droppedItem);
+    }
+
+    protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
+        Counter<T> counter = counterNode.getValue(); // count_i
+        ListNode2<Bucket> oldNode = counter.bucketNode;
+        Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
+        bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+        counter.count = counter.count + incrementCount;
+
+        // Finding the right bucket for count_i
+        // Because we allow a single call to increment count more than once, this may not be the adjacent bucket. 
+        ListNode2<Bucket> bucketNodePrev = oldNode;
+        ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+        while (bucketNodeNext != null) {
+            Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
+            if (counter.count == bucketNext.count) {
+                bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
+                break;
+            } else if (counter.count > bucketNext.count) {
+                bucketNodePrev = bucketNodeNext;
+                bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+            } else {
+                // A new bucket has to be created
+                bucketNodeNext = null;
+            }
+        }
+
+        if (bucketNodeNext == null) {
+            Bucket bucketNext = new Bucket(counter.count);
+            bucketNext.counterList.add(counterNode);
+            bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+        }
+        counter.bucketNode = bucketNodeNext;
+
+        //Cleaning up
+        if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
+        {
+            bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
+        }
+    }
+
+    @Override
+    public List<T> peek(int k) {
+        List<T> topK = new ArrayList<T>(k);
+
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                if (topK.size() == k) {
+                    return topK;
+                }
+                topK.add(c.item);
+            }
+        }
+
+        return topK;
+    }
+
+    public List<Counter<T>> topK(int k) {
+        List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
+
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                if (topK.size() == k) {
+                    return topK;
+                }
+                topK.add(c);
+            }
+        }
+
+        return topK;
+    }
+
+    /**
+     * @return number of items stored
+     */
+    public int size() {
+        return counterMap.size();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append('[');
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            sb.append('{');
+            sb.append(b.count);
+            sb.append(":[");
+            for (Counter<T> c : b.counterList) {
+                sb.append('{');
+                sb.append(c.item);
+                sb.append(':');
+                sb.append(c.error);
+                sb.append("},");
+            }
+            if (b.counterList.size() > 0) {
+                sb.deleteCharAt(sb.length() - 1);
+            }
+            sb.append("]},");
+        }
+        if (bucketList.size() > 0) {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.bucketList = new DoublyLinkedList<Bucket>();
+        this.capacity = in.readInt();
+
+        int size = in.readInt();
+        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+        Bucket currentBucket = null;
+        ListNode2<Bucket> currentBucketNode = null;
+        for (int i = 0; i < size; i++) {
+            Counter<T> c = (Counter<T>) in.readObject();
+            if (currentBucket == null || c.count != currentBucket.count) {
+                currentBucket = new Bucket(c.count);
+                currentBucketNode = bucketList.add(currentBucket);
+            }
+            c.bucketNode = currentBucketNode;
+            counterMap.put(c.item, currentBucket.counterList.add(c));
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(this.capacity);
+        out.writeInt(this.size());
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                out.writeObject(c);
+            }
+        }
+    }
+
+    /**
+     * For de-serialization
+     */
+    public TopNCounter() {
+    }
+
+    /**
+     * For de-serialization
+     *
+     * @param bytes
+     * @throws java.io.IOException
+     * @throws ClassNotFoundException
+     */
+    public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
+        fromBytes(bytes);
+    }
+
+    public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
+        readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
+    }
+
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(baos);
+        this.writeExternal(out);
+        out.flush();
+        return baos.toByteArray();
+
+    }
+
+    public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
+        TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
+        secondCounter.fromBytes(another.toBytes());
+        double m1 = 0.0, m2 = 0.0;
+        if (this.size() >= this.capacity) {
+            m1 = this.bucketList.tail().getValue().count;
+        }
+
+        if (secondCounter.size() >= secondCounter.capacity) {
+            m2 = secondCounter.bucketList.tail().getValue().count;
+        }
+
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+            T item = entry.getKey();
+            ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+            if (existing != null) {
+                this.offer(item, secondCounter.counterMap.get(item).getValue().count);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+
+                secondCounter.counterMap.remove(item);
+            } else {
+                this.offer(item, m2);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
+            }
+        }
+
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+            T item = entry.getKey();
+            double counter = entry.getValue().getValue().count;
+            double error = entry.getValue().getValue().error;
+            this.offer(item, counter + m1);
+            this.counterMap.get(item).getValue().error = error + m1;
+        }
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
new file mode 100644
index 0000000..fb82522
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class TopNCounterComparisonTest {
+
+    private static final int TOP_K = 100;
+
+    private static final int KEY_SPACE = 100 * TOP_K;
+
+    private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
+
+    private static final int SPACE_SAVING_ROOM = 100;
+
+    @Before
+    public void setup() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    protected String prepareTestDate() throws IOException {
+        String[] allKeys = new String[KEY_SPACE];
+
+        for (int i = 0; i < KEY_SPACE; i++) {
+            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
+        }
+
+        System.out.println("Start to create test random data...");
+        long startTime = System.currentTimeMillis();
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        int keyIndex;
+
+        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+        if (tempFile.exists())
+            FileUtils.forceDelete(tempFile);
+        FileWriter fw = new FileWriter(tempFile);
+        try {
+            for (int i = 0; i < TOTAL_RECORDS; i++) {
+                keyIndex = zipf.sample();
+                fw.write(allKeys[keyIndex]);
+                fw.write('\n');
+            }
+        } finally {
+            if (fw != null)
+                fw.close();
+        }
+
+        System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+        System.out.println("Test data in : " + tempFile.getAbsolutePath());
+
+        return tempFile.getAbsolutePath();
+    }
+
+    //@Test
+    public void testCorrectness() throws IOException {
+        String dataFile = prepareTestDate();
+        TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
+        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+
+        for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
+        }
+
+        FileUtils.forceDelete(new File(dataFile));
+
+        compareResult(spaceSavingCounter, accurateCounter);
+    }
+
+    private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
+        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+        System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+        System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+        int error = 0;
+        for (int i = 0; i < topResult1.size(); i++) {
+            System.out.println("Compare " + i);
+
+            //            if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
+                    && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+                System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+            } else {
+                System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+                System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+                error++;
+            }
+        }
+
+        Assert.assertEquals(0, error);
+    }
+
+    @Test
+    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+        String dataFile = prepareTestDate();
+
+        int PARALLEL = 10;
+        TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
+
+        for (int i = 0; i < PARALLEL; i++) {
+            parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
+        }
+
+        int slice = TOTAL_RECORDS / PARALLEL;
+        int startPosition = 0;
+        for (int i = 0; i < PARALLEL; i++) {
+            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+            startPosition += slice;
+        }
+
+        // merge counters
+
+        //        for (int i = 1; i < PARALLEL; i++) {
+        //            parallelCounters[0].vs.merge(parallelCounters[i].vs);
+        //        }
+
+        TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
+
+        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+        compareResult(mergedCounters[0], accurateCounter);
+        FileUtils.forceDelete(new File(dataFile));
+
+    }
+
+    private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+            if (i + 1 < n) {
+                consumers[i].vs.merge(consumers[i + 1].vs);
+            }
+
+            list.add(consumers[i]);
+        }
+
+        return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
+    }
+
+    private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+        long startTime = System.currentTimeMillis();
+        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
+
+        int lineNum = 0;
+        String line = bufferedReader.readLine();
+        while (line != null) {
+            if (lineNum >= startLine && lineNum < endLine) {
+                consumer.addElement(line, 1.0);
+            }
+            line = bufferedReader.readLine();
+            lineNum++;
+        }
+
+        bufferedReader.close();
+        System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+    }
+
+    private static interface TestDataConsumer {
+        public void addElement(String elementKey, double value);
+
+        public List<Pair<String, Double>> getTopN(int k);
+
+        public long getSpentTime();
+    }
+
+    private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+        private long timeSpent = 0;
+        protected TopNCounter<String> vs;
+
+        public SpaceSavingConsumer() {
+            vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
+
+        }
+
+        public void addElement(String key, double value) {
+            //System.out.println("Adding " + key + ":" + incrementCount);
+            long startTime = System.currentTimeMillis();
+            vs.offer(key, value);
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Counter<String>> tops = vs.topK(k);
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Counter<String> counter : tops)
+                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords;
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+    private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+
+        private long timeSpent = 0;
+        private Map<String, Double> hashMap;
+
+        public HashMapConsumer() {
+            hashMap = Maps.newHashMap();
+        }
+
+        public void addElement(String key, double value) {
+            long startTime = System.currentTimeMillis();
+            if (hashMap.containsKey(key)) {
+                hashMap.put(key, hashMap.get(key) + value);
+            } else {
+                hashMap.put(key, value);
+            }
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+            }
+
+            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+                @Override
+                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+                }
+            });
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords.subList(0, k);
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
new file mode 100644
index 0000000..23620d1
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterTest {
+
+    private static final int NUM_ITERATIONS = 100000;
+
+    @Test
+    public void testTopNCounter() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        for (String i : stream) {
+                vs.offer(i);
+            /*
+        for(String s : vs.poll(3))
+        System.out.print(s+" ");
+             */
+            System.out.println(vs);
+        }
+
+        List<Counter<String>> topk = vs.topK(6);
+        
+        for(Counter<String> top : topk) {
+            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+        }
+        
+    }
+
+    @Test
+    public void testTopK() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrement() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i, 10);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrementOutOfOrder() {
+        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+        TopNCounter<String> vs_single = new TopNCounter<String>(3);
+        String[] stream = {"A", "B", "C", "D", "A"};
+        Integer[] increments = {15, 20, 25, 30, 1};
+
+        for (int i = 0; i < stream.length; i++) {
+            vs_increment.offer(stream[i], increments[i]);
+            for (int k = 0; k < increments[i]; k++) {
+                vs_single.offer(stream[i]);
+            }
+        }
+        System.out.println("Insert with counts vs. single inserts:");
+        System.out.println(vs_increment);
+        System.out.println(vs_single);
+
+        List<Counter<String>> topK_increment = vs_increment.topK(3);
+        List<Counter<String>> topK_single = vs_single.topK(3);
+
+        for (int i = 0; i < topK_increment.size(); i++) {
+            assertEquals(topK_increment.get(i).getItem(),
+                    topK_single.get(i).getItem());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutput oo = new ObjectOutputStream(baos);
+            oo.writeObject(c);
+            oo.close();
+
+            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+            Counter<String> clone = (Counter<String>) oi.readObject();
+            assertEquals(c.getCount(), clone.getCount(), 0.0001);
+            assertEquals(c.getError(), clone.getError(), 0.0001);
+            assertEquals(c.getItem(), clone.getItem());
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutput oo = new ObjectOutputStream(baos);
+        oo.writeObject(vs);
+        oo.close();
+
+        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+
+
+    @Test
+    public void testByteSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        testSerialization(vs);
+
+        // Empty
+        vs = new TopNCounter<String>(0);
+        testSerialization(vs);
+    }
+
+    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+        byte[] bytes = vs.toBytes();
+        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d37a363f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ad1ca0..f6c9284 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
         <commons-httpclient.version>3.1</commons-httpclient.version>
         <commons-collections4.version>4.0</commons-collections4.version>
         <commons-email.version>1.1</commons-email.version>
+        <commons-math3.version>3.6-SNAPSHOT</commons-math3.version>
 
         <!-- Spark -->
         <spark.version>1.3.0</spark.version>
@@ -386,6 +387,11 @@
                 <version>${commons-email.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-math3</artifactId>
+                <version>${commons-math3.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>