You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/21 04:16:08 UTC

[02/13] incubator-kylin git commit: Add merge and retain in TopNCounter, and also added unit test

Add merge and retain in TopNCounter, and also added unit test 


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7175845b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7175845b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7175845b

Branch: refs/heads/KYLIN-943
Commit: 7175845b508d0a0b1e179932ff96f9fad243789a
Parents: d37a363
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 22:22:05 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 21 10:11:53 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   |  50 ++-
 .../kylin/common/topn/TopNCounterBasicTest.java | 174 +++++++++
 .../common/topn/TopNCounterCombinationTest.java |  65 ++++
 .../common/topn/TopNCounterComparisonTest.java  | 281 ---------------
 .../kylin/common/topn/TopNCounterTest.java      | 350 +++++++++++++------
 5 files changed, 519 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7175845b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 0a45d0b..2f337c2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -48,6 +48,10 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
             this.count = count;
             this.counterList = new DoublyLinkedList<Counter<T>>();
         }
+
+        public int size() {
+            return counterList.size();
+        }
     }
 
     protected int capacity;
@@ -297,33 +301,36 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
     }
 
-    public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
-        TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
-        secondCounter.fromBytes(another.toBytes());
+    /**
+     * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+     * @param another
+     * @return
+     */
+    public TopNCounter<T> merge(TopNCounter<T> another) {
         double m1 = 0.0, m2 = 0.0;
         if (this.size() >= this.capacity) {
             m1 = this.bucketList.tail().getValue().count;
         }
 
-        if (secondCounter.size() >= secondCounter.capacity) {
-            m2 = secondCounter.bucketList.tail().getValue().count;
+        if (another.size() >= another.capacity) {
+            m2 = another.bucketList.tail().getValue().count;
         }
 
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
-            ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+            ListNode2<Counter<T>> existing = another.counterMap.get(item);
             if (existing != null) {
-                this.offer(item, secondCounter.counterMap.get(item).getValue().count);
-                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+                this.offer(item, another.counterMap.get(item).getValue().count);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
 
-                secondCounter.counterMap.remove(item);
+                another.counterMap.remove(item);
             } else {
                 this.offer(item, m2);
                 this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
             }
         }
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
             double counter = entry.getValue().getValue().count;
             double error = entry.getValue().getValue().error;
@@ -333,4 +340,27 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
         return this;
     }
+
+    /**
+     * Retain the capacity to the given number; The extra counters will be cut off
+     * @param newCapacity
+     */
+    public void retain(int newCapacity) {
+        assert newCapacity > 0;
+        this.capacity = newCapacity;
+        if (newCapacity < this.size()) {
+            ListNode2<Bucket> tail = bucketList.tail;
+            while (tail != null && this.size() > newCapacity) {
+                Bucket bucket = tail.getValue();
+
+                for (Counter<T> counter : bucket.counterList) {
+                    this.counterMap.remove(counter.getItem());
+                }
+                tail = tail.getNext();
+            }
+
+            tail.next = null;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7175845b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
new file mode 100644
index 0000000..0d59314
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterBasicTest {
+
+    private static final int NUM_ITERATIONS = 100000;
+
+    @Test
+    public void testTopNCounter() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        for (String i : stream) {
+                vs.offer(i);
+            /*
+        for(String s : vs.poll(3))
+        System.out.print(s+" ");
+             */
+            System.out.println(vs);
+        }
+
+        List<Counter<String>> topk = vs.topK(6);
+        
+        for(Counter<String> top : topk) {
+            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+        }
+        
+    }
+
+    @Test
+    public void testTopK() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrement() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i, 10);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrementOutOfOrder() {
+        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+        TopNCounter<String> vs_single = new TopNCounter<String>(3);
+        String[] stream = {"A", "B", "C", "D", "A"};
+        Integer[] increments = {15, 20, 25, 30, 1};
+
+        for (int i = 0; i < stream.length; i++) {
+            vs_increment.offer(stream[i], increments[i]);
+            for (int k = 0; k < increments[i]; k++) {
+                vs_single.offer(stream[i]);
+            }
+        }
+        System.out.println("Insert with counts vs. single inserts:");
+        System.out.println(vs_increment);
+        System.out.println(vs_single);
+
+        List<Counter<String>> topK_increment = vs_increment.topK(3);
+        List<Counter<String>> topK_single = vs_single.topK(3);
+
+        for (int i = 0; i < topK_increment.size(); i++) {
+            assertEquals(topK_increment.get(i).getItem(),
+                    topK_single.get(i).getItem());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutput oo = new ObjectOutputStream(baos);
+            oo.writeObject(c);
+            oo.close();
+
+            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+            Counter<String> clone = (Counter<String>) oi.readObject();
+            assertEquals(c.getCount(), clone.getCount(), 0.0001);
+            assertEquals(c.getError(), clone.getError(), 0.0001);
+            assertEquals(c.getItem(), clone.getItem());
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutput oo = new ObjectOutputStream(baos);
+        oo.writeObject(vs);
+        oo.close();
+
+        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+
+
+    @Test
+    public void testByteSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        testSerialization(vs);
+
+        // Empty
+        vs = new TopNCounter<String>(0);
+        testSerialization(vs);
+    }
+
+    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+        byte[] bytes = vs.toBytes();
+        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7175845b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
new file mode 100644
index 0000000..afde8f7
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class TopNCounterCombinationTest extends TopNCounterTest {
+
+    @Parameterized.Parameters
+    public static Collection<Integer[]> configs() {
+        //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
+        return Arrays.asList(new Integer[][] {
+                /*
+                // with 20X space
+                { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
+                
+                */
+                // with 50X space
+                { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+
+                /*
+                // with 100X space
+                { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
+                */
+        });
+    }
+
+    public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+        super();
+        this.TOP_K = topK;
+        this.KEY_SPACE = TOP_K * keySpaceRate;
+        this.SPACE_SAVING_ROOM = spaceSavingRate;
+        TOTAL_RECORDS = 1000000; // 1 million
+        this.PARALLEL = 50;
+        this.verbose = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7175845b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
deleted file mode 100644
index fb82522..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright (C) 2011 Clearspring Technologies, Inc. 
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.topn;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.math3.distribution.ZipfDistribution;
-import org.apache.kylin.common.util.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class TopNCounterComparisonTest {
-
-    private static final int TOP_K = 100;
-
-    private static final int KEY_SPACE = 100 * TOP_K;
-
-    private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
-
-    private static final int SPACE_SAVING_ROOM = 100;
-
-    @Before
-    public void setup() {
-    }
-
-    @After
-    public void tearDown() {
-    }
-
-    protected String prepareTestDate() throws IOException {
-        String[] allKeys = new String[KEY_SPACE];
-
-        for (int i = 0; i < KEY_SPACE; i++) {
-            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
-        }
-
-        System.out.println("Start to create test random data...");
-        long startTime = System.currentTimeMillis();
-        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
-        int keyIndex;
-
-        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
-
-        if (tempFile.exists())
-            FileUtils.forceDelete(tempFile);
-        FileWriter fw = new FileWriter(tempFile);
-        try {
-            for (int i = 0; i < TOTAL_RECORDS; i++) {
-                keyIndex = zipf.sample();
-                fw.write(allKeys[keyIndex]);
-                fw.write('\n');
-            }
-        } finally {
-            if (fw != null)
-                fw.close();
-        }
-
-        System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
-        System.out.println("Test data in : " + tempFile.getAbsolutePath());
-
-        return tempFile.getAbsolutePath();
-    }
-
-    //@Test
-    public void testCorrectness() throws IOException {
-        String dataFile = prepareTestDate();
-        TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
-        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-
-        for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
-            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
-        }
-
-        FileUtils.forceDelete(new File(dataFile));
-
-        compareResult(spaceSavingCounter, accurateCounter);
-    }
-
-    private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
-        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
-        System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
-        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
-        System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
-
-        int error = 0;
-        for (int i = 0; i < topResult1.size(); i++) {
-            System.out.println("Compare " + i);
-
-            //            if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
-            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
-                    && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
-                System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
-            } else {
-                System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
-                System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
-                error++;
-            }
-        }
-
-        Assert.assertEquals(0, error);
-    }
-
-    @Test
-    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
-        String dataFile = prepareTestDate();
-
-        int PARALLEL = 10;
-        TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
-
-        for (int i = 0; i < PARALLEL; i++) {
-            parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
-        }
-
-        int slice = TOTAL_RECORDS / PARALLEL;
-        int startPosition = 0;
-        for (int i = 0; i < PARALLEL; i++) {
-            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
-            startPosition += slice;
-        }
-
-        // merge counters
-
-        //        for (int i = 1; i < PARALLEL; i++) {
-        //            parallelCounters[0].vs.merge(parallelCounters[i].vs);
-        //        }
-
-        TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
-
-        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
-
-        compareResult(mergedCounters[0], accurateCounter);
-        FileUtils.forceDelete(new File(dataFile));
-
-    }
-
-    private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
-        List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
-        if (consumers.length == 1)
-            return consumers;
-
-        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
-            if (i + 1 < n) {
-                consumers[i].vs.merge(consumers[i + 1].vs);
-            }
-
-            list.add(consumers[i]);
-        }
-
-        return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
-    }
-
-    private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
-        long startTime = System.currentTimeMillis();
-        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
-
-        int lineNum = 0;
-        String line = bufferedReader.readLine();
-        while (line != null) {
-            if (lineNum >= startLine && lineNum < endLine) {
-                consumer.addElement(line, 1.0);
-            }
-            line = bufferedReader.readLine();
-            lineNum++;
-        }
-
-        bufferedReader.close();
-        System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
-    }
-
-    private static interface TestDataConsumer {
-        public void addElement(String elementKey, double value);
-
-        public List<Pair<String, Double>> getTopN(int k);
-
-        public long getSpentTime();
-    }
-
-    private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-        private long timeSpent = 0;
-        protected TopNCounter<String> vs;
-
-        public SpaceSavingConsumer() {
-            vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
-
-        }
-
-        public void addElement(String key, double value) {
-            //System.out.println("Adding " + key + ":" + incrementCount);
-            long startTime = System.currentTimeMillis();
-            vs.offer(key, value);
-            timeSpent += (System.currentTimeMillis() - startTime);
-        }
-
-        @Override
-        public List<Pair<String, Double>> getTopN(int k) {
-            long startTime = System.currentTimeMillis();
-            List<Counter<String>> tops = vs.topK(k);
-            List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
-            for (Counter<String> counter : tops)
-                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
-            timeSpent += (System.currentTimeMillis() - startTime);
-            return allRecords;
-        }
-
-        @Override
-        public long getSpentTime() {
-            return timeSpent;
-        }
-    }
-
-    private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-
-        private long timeSpent = 0;
-        private Map<String, Double> hashMap;
-
-        public HashMapConsumer() {
-            hashMap = Maps.newHashMap();
-        }
-
-        public void addElement(String key, double value) {
-            long startTime = System.currentTimeMillis();
-            if (hashMap.containsKey(key)) {
-                hashMap.put(key, hashMap.get(key) + value);
-            } else {
-                hashMap.put(key, value);
-            }
-            timeSpent += (System.currentTimeMillis() - startTime);
-        }
-
-        @Override
-        public List<Pair<String, Double>> getTopN(int k) {
-            long startTime = System.currentTimeMillis();
-            List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
-            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
-                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
-            }
-
-            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
-                @Override
-                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
-                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
-                }
-            });
-            timeSpent += (System.currentTimeMillis() - startTime);
-            return allRecords.subList(0, k);
-        }
-
-        @Override
-        public long getSpentTime() {
-            return timeSpent;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7175845b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index 23620d1..e5350ba 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -16,159 +16,289 @@
 
 package org.apache.kylin.common.topn;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
 import org.junit.Test;
 
 import java.io.*;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+public class TopNCounterTest {
 
+    protected static int TOP_K;
 
-public class TopNCounterTest {
+    protected static int KEY_SPACE;
 
-    private static final int NUM_ITERATIONS = 100000;
+    protected static int TOTAL_RECORDS;
 
-    @Test
-    public void testTopNCounter() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
-        for (String i : stream) {
-                vs.offer(i);
-            /*
-        for(String s : vs.poll(3))
-        System.out.print(s+" ");
-             */
-            System.out.println(vs);
-        }
-
-        List<Counter<String>> topk = vs.topK(6);
-        
-        for(Counter<String> top : topk) {
-            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
-        }
-        
+    protected static int SPACE_SAVING_ROOM;
+
+    protected static int PARALLEL = 10;
+
+    protected static boolean verbose = true;
+
+    public TopNCounterTest() {
+        TOP_K = 100;
+        KEY_SPACE = 100 * TOP_K;
+        TOTAL_RECORDS = 1000000; // 1 million
+        SPACE_SAVING_ROOM = 100;
     }
 
-    @Test
-    public void testTopK() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
+    protected String prepareTestDate() throws IOException {
+        String[] allKeys = new String[KEY_SPACE];
+
+        for (int i = 0; i < KEY_SPACE; i++) {
+            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
         }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+        outputMsg("Start to create test random data...");
+        long startTime = System.currentTimeMillis();
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        int keyIndex;
+
+        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+        if (tempFile.exists())
+            FileUtils.forceDelete(tempFile);
+        FileWriter fw = new FileWriter(tempFile);
+        try {
+            for (int i = 0; i < TOTAL_RECORDS; i++) {
+                keyIndex = zipf.sample();
+                fw.write(allKeys[keyIndex]);
+                fw.write('\n');
+            }
+        } finally {
+            if (fw != null)
+                fw.close();
         }
+
+        outputMsg("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+        outputMsg("Test data in : " + tempFile.getAbsolutePath());
+
+        return tempFile.getAbsolutePath();
     }
 
     @Test
-    public void testTopKWithIncrement() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i, 10);
+    public void testSingleSpaceSaving() throws IOException {
+        String dataFile = prepareTestDate();
+        TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+        TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+
+        for (TopNCounterTest.TestDataConsumer consumer : new TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
         }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+        FileUtils.forceDelete(new File(dataFile));
+
+        compareResult(spaceSavingCounter, accurateCounter);
+    }
+
+    private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, TopNCounterTest.TestDataConsumer secondConsumer) {
+        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+        int error = 0;
+        for (int i = 0; i < topResult1.size(); i++) {
+            outputMsg("Compare " + i);
+
+            if (isClose(topResult1.get(i).getSecond().doubleValue(), realSequence.get(i).getSecond().doubleValue())) {
+                //            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+                outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+            } else {
+                outputMsg("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+                outputMsg("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+                error++;
+            }
         }
+
+        org.junit.Assert.assertEquals(0, error);
+    }
+    
+    private boolean isClose(double value1, double value2) {
+        
+        if(Math.abs(value1 - value2) < 5.0)
+            return true;
+        
+        return false;
     }
 
     @Test
-    public void testTopKWithIncrementOutOfOrder() {
-        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
-        TopNCounter<String> vs_single = new TopNCounter<String>(3);
-        String[] stream = {"A", "B", "C", "D", "A"};
-        Integer[] increments = {15, 20, 25, 30, 1};
-
-        for (int i = 0; i < stream.length; i++) {
-            vs_increment.offer(stream[i], increments[i]);
-            for (int k = 0; k < increments[i]; k++) {
-                vs_single.offer(stream[i]);
-            }
+    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+        String dataFile = prepareTestDate();
+
+        TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterTest.SpaceSavingConsumer[PARALLEL];
+
+        for (int i = 0; i < PARALLEL; i++) {
+            parallelCounters[i] = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
         }
-        System.out.println("Insert with counts vs. single inserts:");
-        System.out.println(vs_increment);
-        System.out.println(vs_single);
 
-        List<Counter<String>> topK_increment = vs_increment.topK(3);
-        List<Counter<String>> topK_single = vs_single.topK(3);
+        int slice = TOTAL_RECORDS / PARALLEL;
+        int startPosition = 0;
+        for (int i = 0; i < PARALLEL; i++) {
+            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+            startPosition += slice;
+        }
+
+        TopNCounterTest.SpaceSavingConsumer[] mergedCounters = singleMerge(parallelCounters);
+
+        TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+        compareResult(mergedCounters[0], accurateCounter);
+        FileUtils.forceDelete(new File(dataFile));
+
+    }
+
+    private TopNCounterTest.SpaceSavingConsumer[] singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
 
-        for (int i = 0; i < topK_increment.size(); i++) {
-            assertEquals(topK_increment.get(i).getItem(),
-                    topK_single.get(i).getItem());
+        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+        
+        for (int i=0, n=consumers.length; i<n; i++) {
+            merged.vs.merge(consumers[i].vs);
         }
+
+        merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements;
+        return new TopNCounterTest.SpaceSavingConsumer[] {merged};
+        
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testCounterSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutput oo = new ObjectOutputStream(baos);
-            oo.writeObject(c);
-            oo.close();
-
-            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-            Counter<String> clone = (Counter<String>) oi.readObject();
-            assertEquals(c.getCount(), clone.getCount(), 0.0001);
-            assertEquals(c.getError(), clone.getError(), 0.0001);
-            assertEquals(c.getItem(), clone.getItem());
+    private TopNCounterTest.SpaceSavingConsumer[] binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        
+        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+            if (i + 1 < n) {
+                consumers[i].vs.merge(consumers[i + 1].vs);
+            }
+
+            list.add(consumers[i]);
         }
+
+        return binaryMerge(list.toArray(new TopNCounterTest.SpaceSavingConsumer[list.size()]));
     }
+    
 
+    private void feedDataToConsumer(String dataFile, TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+        long startTime = System.currentTimeMillis();
+        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
+        int lineNum = 0;
+        String line = bufferedReader.readLine();
+        while (line != null) {
+            if (lineNum >= startLine && lineNum < endLine) {
+                consumer.addElement(line, 1.0);
+            }
+            line = bufferedReader.readLine();
+            lineNum++;
         }
 
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutput oo = new ObjectOutputStream(baos);
-        oo.writeObject(vs);
-        oo.close();
+        bufferedReader.close();
+        outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+    }
 
-        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+    private void outputMsg(String msg) {
+        if (verbose)
+            System.out.println(msg);
+    }
 
-        assertEquals(vs.toString(), clone.toString());
+    private static interface TestDataConsumer {
+        public void addElement(String elementKey, double value);
+
+        public List<Pair<String, Double>> getTopN(int k);
+
+        public long getSpentTime();
     }
 
+    private class SpaceSavingConsumer implements TopNCounterTest.TestDataConsumer {
+        private long timeSpent = 0;
+        protected TopNCounter<String> vs;
+
+        public SpaceSavingConsumer(int space) {
+            vs = new TopNCounter<String>(space);
 
-    @Test
-    public void testByteSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
         }
 
-        testSerialization(vs);
+        public void addElement(String key, double value) {
+            //outputMsg("Adding " + key + ":" + incrementCount);
+            long startTime = System.currentTimeMillis();
+            vs.offer(key, value);
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Counter<String>> tops = vs.topK(k);
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
 
-        // Empty
-        vs = new TopNCounter<String>(0);
-        testSerialization(vs);
+            for (Counter<String> counter : tops)
+                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords;
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
     }
 
-    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
-        byte[] bytes = vs.toBytes();
-        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+    private class HashMapConsumer implements TopNCounterTest.TestDataConsumer {
+
+        private long timeSpent = 0;
+        private Map<String, Double> hashMap;
+
+        public HashMapConsumer() {
+            hashMap = Maps.newHashMap();
+        }
+
+        public void addElement(String key, double value) {
+            long startTime = System.currentTimeMillis();
+            if (hashMap.containsKey(key)) {
+                hashMap.put(key, hashMap.get(key) + value);
+            } else {
+                hashMap.put(key, value);
+            }
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+            }
 
-        assertEquals(vs.toString(), clone.toString());
+            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+                @Override
+                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+                }
+            });
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords.subList(0, k);
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
     }
+
 }