You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/17 08:10:45 UTC

[1/7] incubator-kylin git commit: Add merge and retain in TopNCounter, and also added unit test

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-943 [created] 38201be29


Add merge and retain in TopNCounter, and also added unit test 


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5980d95b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5980d95b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5980d95b

Branch: refs/heads/KYLIN-943
Commit: 5980d95bdd09b189af5fa81e9907245d353223bd
Parents: 7c50176
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 22:22:05 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:20 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   |  50 ++-
 .../kylin/common/topn/TopNCounterBasicTest.java | 174 +++++++++
 .../common/topn/TopNCounterCombinationTest.java |  65 ++++
 .../common/topn/TopNCounterComparisonTest.java  | 281 ---------------
 .../kylin/common/topn/TopNCounterTest.java      | 350 +++++++++++++------
 5 files changed, 519 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 0a45d0b..2f337c2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -48,6 +48,10 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
             this.count = count;
             this.counterList = new DoublyLinkedList<Counter<T>>();
         }
+
+        public int size() {
+            return counterList.size();
+        }
     }
 
     protected int capacity;
@@ -297,33 +301,36 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
     }
 
-    public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
-        TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
-        secondCounter.fromBytes(another.toBytes());
+    /**
+     * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+     * @param another
+     * @return
+     */
+    public TopNCounter<T> merge(TopNCounter<T> another) {
         double m1 = 0.0, m2 = 0.0;
         if (this.size() >= this.capacity) {
             m1 = this.bucketList.tail().getValue().count;
         }
 
-        if (secondCounter.size() >= secondCounter.capacity) {
-            m2 = secondCounter.bucketList.tail().getValue().count;
+        if (another.size() >= another.capacity) {
+            m2 = another.bucketList.tail().getValue().count;
         }
 
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
-            ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+            ListNode2<Counter<T>> existing = another.counterMap.get(item);
             if (existing != null) {
-                this.offer(item, secondCounter.counterMap.get(item).getValue().count);
-                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+                this.offer(item, another.counterMap.get(item).getValue().count);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
 
-                secondCounter.counterMap.remove(item);
+                another.counterMap.remove(item);
             } else {
                 this.offer(item, m2);
                 this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
             }
         }
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
             double counter = entry.getValue().getValue().count;
             double error = entry.getValue().getValue().error;
@@ -333,4 +340,27 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
         return this;
     }
+
+    /**
+     * Retain the capacity to the given number; The extra counters will be cut off
+     * @param newCapacity
+     */
+    public void retain(int newCapacity) {
+        assert newCapacity > 0;
+        this.capacity = newCapacity;
+        if (newCapacity < this.size()) {
+            ListNode2<Bucket> tail = bucketList.tail;
+            while (tail != null && this.size() > newCapacity) {
+                Bucket bucket = tail.getValue();
+
+                for (Counter<T> counter : bucket.counterList) {
+                    this.counterMap.remove(counter.getItem());
+                }
+                tail = tail.getNext();
+            }
+
+            tail.next = null;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
new file mode 100644
index 0000000..0d59314
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterBasicTest {
+
+    private static final int NUM_ITERATIONS = 100000;
+
+    @Test
+    public void testTopNCounter() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        for (String i : stream) {
+                vs.offer(i);
+            /*
+        for(String s : vs.poll(3))
+        System.out.print(s+" ");
+             */
+            System.out.println(vs);
+        }
+
+        List<Counter<String>> topk = vs.topK(6);
+        
+        for(Counter<String> top : topk) {
+            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+        }
+        
+    }
+
+    @Test
+    public void testTopK() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrement() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i, 10);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrementOutOfOrder() {
+        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+        TopNCounter<String> vs_single = new TopNCounter<String>(3);
+        String[] stream = {"A", "B", "C", "D", "A"};
+        Integer[] increments = {15, 20, 25, 30, 1};
+
+        for (int i = 0; i < stream.length; i++) {
+            vs_increment.offer(stream[i], increments[i]);
+            for (int k = 0; k < increments[i]; k++) {
+                vs_single.offer(stream[i]);
+            }
+        }
+        System.out.println("Insert with counts vs. single inserts:");
+        System.out.println(vs_increment);
+        System.out.println(vs_single);
+
+        List<Counter<String>> topK_increment = vs_increment.topK(3);
+        List<Counter<String>> topK_single = vs_single.topK(3);
+
+        for (int i = 0; i < topK_increment.size(); i++) {
+            assertEquals(topK_increment.get(i).getItem(),
+                    topK_single.get(i).getItem());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutput oo = new ObjectOutputStream(baos);
+            oo.writeObject(c);
+            oo.close();
+
+            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+            Counter<String> clone = (Counter<String>) oi.readObject();
+            assertEquals(c.getCount(), clone.getCount(), 0.0001);
+            assertEquals(c.getError(), clone.getError(), 0.0001);
+            assertEquals(c.getItem(), clone.getItem());
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutput oo = new ObjectOutputStream(baos);
+        oo.writeObject(vs);
+        oo.close();
+
+        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+
+
+    @Test
+    public void testByteSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        testSerialization(vs);
+
+        // Empty
+        vs = new TopNCounter<String>(0);
+        testSerialization(vs);
+    }
+
+    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+        byte[] bytes = vs.toBytes();
+        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
new file mode 100644
index 0000000..afde8f7
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class TopNCounterCombinationTest extends TopNCounterTest {
+
+    @Parameterized.Parameters
+    public static Collection<Integer[]> configs() {
+        //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
+        return Arrays.asList(new Integer[][] {
+                /*
+                // with 20X space
+                { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
+                
+                */
+                // with 50X space
+                { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+
+                /*
+                // with 100X space
+                { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
+                { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
+                { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
+                { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
+                */
+        });
+    }
+
+    public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+        super();
+        this.TOP_K = topK;
+        this.KEY_SPACE = TOP_K * keySpaceRate;
+        this.SPACE_SAVING_ROOM = spaceSavingRate;
+        TOTAL_RECORDS = 1000000; // 1 million
+        this.PARALLEL = 50;
+        this.verbose = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
deleted file mode 100644
index fb82522..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright (C) 2011 Clearspring Technologies, Inc. 
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.topn;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.math3.distribution.ZipfDistribution;
-import org.apache.kylin.common.util.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class TopNCounterComparisonTest {
-
-    private static final int TOP_K = 100;
-
-    private static final int KEY_SPACE = 100 * TOP_K;
-
-    private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
-
-    private static final int SPACE_SAVING_ROOM = 100;
-
-    @Before
-    public void setup() {
-    }
-
-    @After
-    public void tearDown() {
-    }
-
-    protected String prepareTestDate() throws IOException {
-        String[] allKeys = new String[KEY_SPACE];
-
-        for (int i = 0; i < KEY_SPACE; i++) {
-            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
-        }
-
-        System.out.println("Start to create test random data...");
-        long startTime = System.currentTimeMillis();
-        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
-        int keyIndex;
-
-        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
-
-        if (tempFile.exists())
-            FileUtils.forceDelete(tempFile);
-        FileWriter fw = new FileWriter(tempFile);
-        try {
-            for (int i = 0; i < TOTAL_RECORDS; i++) {
-                keyIndex = zipf.sample();
-                fw.write(allKeys[keyIndex]);
-                fw.write('\n');
-            }
-        } finally {
-            if (fw != null)
-                fw.close();
-        }
-
-        System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
-        System.out.println("Test data in : " + tempFile.getAbsolutePath());
-
-        return tempFile.getAbsolutePath();
-    }
-
-    //@Test
-    public void testCorrectness() throws IOException {
-        String dataFile = prepareTestDate();
-        TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
-        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-
-        for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
-            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
-        }
-
-        FileUtils.forceDelete(new File(dataFile));
-
-        compareResult(spaceSavingCounter, accurateCounter);
-    }
-
-    private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
-        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
-        System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
-        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
-        System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
-
-        int error = 0;
-        for (int i = 0; i < topResult1.size(); i++) {
-            System.out.println("Compare " + i);
-
-            //            if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
-            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
-                    && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
-                System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
-            } else {
-                System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
-                System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
-                error++;
-            }
-        }
-
-        Assert.assertEquals(0, error);
-    }
-
-    @Test
-    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
-        String dataFile = prepareTestDate();
-
-        int PARALLEL = 10;
-        TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
-
-        for (int i = 0; i < PARALLEL; i++) {
-            parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
-        }
-
-        int slice = TOTAL_RECORDS / PARALLEL;
-        int startPosition = 0;
-        for (int i = 0; i < PARALLEL; i++) {
-            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
-            startPosition += slice;
-        }
-
-        // merge counters
-
-        //        for (int i = 1; i < PARALLEL; i++) {
-        //            parallelCounters[0].vs.merge(parallelCounters[i].vs);
-        //        }
-
-        TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
-
-        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
-
-        compareResult(mergedCounters[0], accurateCounter);
-        FileUtils.forceDelete(new File(dataFile));
-
-    }
-
-    private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
-        List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
-        if (consumers.length == 1)
-            return consumers;
-
-        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
-            if (i + 1 < n) {
-                consumers[i].vs.merge(consumers[i + 1].vs);
-            }
-
-            list.add(consumers[i]);
-        }
-
-        return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
-    }
-
-    private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
-        long startTime = System.currentTimeMillis();
-        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
-
-        int lineNum = 0;
-        String line = bufferedReader.readLine();
-        while (line != null) {
-            if (lineNum >= startLine && lineNum < endLine) {
-                consumer.addElement(line, 1.0);
-            }
-            line = bufferedReader.readLine();
-            lineNum++;
-        }
-
-        bufferedReader.close();
-        System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
-    }
-
-    private static interface TestDataConsumer {
-        public void addElement(String elementKey, double value);
-
-        public List<Pair<String, Double>> getTopN(int k);
-
-        public long getSpentTime();
-    }
-
-    private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-        private long timeSpent = 0;
-        protected TopNCounter<String> vs;
-
-        public SpaceSavingConsumer() {
-            vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
-
-        }
-
-        public void addElement(String key, double value) {
-            //System.out.println("Adding " + key + ":" + incrementCount);
-            long startTime = System.currentTimeMillis();
-            vs.offer(key, value);
-            timeSpent += (System.currentTimeMillis() - startTime);
-        }
-
-        @Override
-        public List<Pair<String, Double>> getTopN(int k) {
-            long startTime = System.currentTimeMillis();
-            List<Counter<String>> tops = vs.topK(k);
-            List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
-            for (Counter<String> counter : tops)
-                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
-            timeSpent += (System.currentTimeMillis() - startTime);
-            return allRecords;
-        }
-
-        @Override
-        public long getSpentTime() {
-            return timeSpent;
-        }
-    }
-
-    private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-
-        private long timeSpent = 0;
-        private Map<String, Double> hashMap;
-
-        public HashMapConsumer() {
-            hashMap = Maps.newHashMap();
-        }
-
-        public void addElement(String key, double value) {
-            long startTime = System.currentTimeMillis();
-            if (hashMap.containsKey(key)) {
-                hashMap.put(key, hashMap.get(key) + value);
-            } else {
-                hashMap.put(key, value);
-            }
-            timeSpent += (System.currentTimeMillis() - startTime);
-        }
-
-        @Override
-        public List<Pair<String, Double>> getTopN(int k) {
-            long startTime = System.currentTimeMillis();
-            List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
-            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
-                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
-            }
-
-            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
-                @Override
-                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
-                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
-                }
-            });
-            timeSpent += (System.currentTimeMillis() - startTime);
-            return allRecords.subList(0, k);
-        }
-
-        @Override
-        public long getSpentTime() {
-            return timeSpent;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index 23620d1..e5350ba 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -16,159 +16,289 @@
 
 package org.apache.kylin.common.topn;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
 import org.junit.Test;
 
 import java.io.*;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+public class TopNCounterTest {
 
+    protected static int TOP_K;
 
-public class TopNCounterTest {
+    protected static int KEY_SPACE;
 
-    private static final int NUM_ITERATIONS = 100000;
+    protected static int TOTAL_RECORDS;
 
-    @Test
-    public void testTopNCounter() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
-        for (String i : stream) {
-                vs.offer(i);
-            /*
-        for(String s : vs.poll(3))
-        System.out.print(s+" ");
-             */
-            System.out.println(vs);
-        }
-
-        List<Counter<String>> topk = vs.topK(6);
-        
-        for(Counter<String> top : topk) {
-            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
-        }
-        
+    protected static int SPACE_SAVING_ROOM;
+
+    protected static int PARALLEL = 10;
+
+    protected static boolean verbose = true;
+
+    public TopNCounterTest() {
+        TOP_K = 100;
+        KEY_SPACE = 100 * TOP_K;
+        TOTAL_RECORDS = 1000000; // 1 million
+        SPACE_SAVING_ROOM = 100;
     }
 
-    @Test
-    public void testTopK() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
+    protected String prepareTestDate() throws IOException {
+        String[] allKeys = new String[KEY_SPACE];
+
+        for (int i = 0; i < KEY_SPACE; i++) {
+            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
         }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+        outputMsg("Start to create test random data...");
+        long startTime = System.currentTimeMillis();
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        int keyIndex;
+
+        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+        if (tempFile.exists())
+            FileUtils.forceDelete(tempFile);
+        FileWriter fw = new FileWriter(tempFile);
+        try {
+            for (int i = 0; i < TOTAL_RECORDS; i++) {
+                keyIndex = zipf.sample();
+                fw.write(allKeys[keyIndex]);
+                fw.write('\n');
+            }
+        } finally {
+            if (fw != null)
+                fw.close();
         }
+
+        outputMsg("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+        outputMsg("Test data in : " + tempFile.getAbsolutePath());
+
+        return tempFile.getAbsolutePath();
     }
 
     @Test
-    public void testTopKWithIncrement() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i, 10);
+    public void testSingleSpaceSaving() throws IOException {
+        String dataFile = prepareTestDate();
+        TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+        TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+
+        for (TopNCounterTest.TestDataConsumer consumer : new TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
         }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+        FileUtils.forceDelete(new File(dataFile));
+
+        compareResult(spaceSavingCounter, accurateCounter);
+    }
+
+    private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, TopNCounterTest.TestDataConsumer secondConsumer) {
+        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+        int error = 0;
+        for (int i = 0; i < topResult1.size(); i++) {
+            outputMsg("Compare " + i);
+
+            if (isClose(topResult1.get(i).getSecond().doubleValue(), realSequence.get(i).getSecond().doubleValue())) {
+                //            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+                outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+            } else {
+                outputMsg("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+                outputMsg("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+                error++;
+            }
         }
+
+        org.junit.Assert.assertEquals(0, error);
+    }
+    
+    private boolean isClose(double value1, double value2) {
+        
+        if(Math.abs(value1 - value2) < 5.0)
+            return true;
+        
+        return false;
     }
 
     @Test
-    public void testTopKWithIncrementOutOfOrder() {
-        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
-        TopNCounter<String> vs_single = new TopNCounter<String>(3);
-        String[] stream = {"A", "B", "C", "D", "A"};
-        Integer[] increments = {15, 20, 25, 30, 1};
-
-        for (int i = 0; i < stream.length; i++) {
-            vs_increment.offer(stream[i], increments[i]);
-            for (int k = 0; k < increments[i]; k++) {
-                vs_single.offer(stream[i]);
-            }
+    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+        String dataFile = prepareTestDate();
+
+        TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterTest.SpaceSavingConsumer[PARALLEL];
+
+        for (int i = 0; i < PARALLEL; i++) {
+            parallelCounters[i] = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
         }
-        System.out.println("Insert with counts vs. single inserts:");
-        System.out.println(vs_increment);
-        System.out.println(vs_single);
 
-        List<Counter<String>> topK_increment = vs_increment.topK(3);
-        List<Counter<String>> topK_single = vs_single.topK(3);
+        int slice = TOTAL_RECORDS / PARALLEL;
+        int startPosition = 0;
+        for (int i = 0; i < PARALLEL; i++) {
+            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+            startPosition += slice;
+        }
+
+        TopNCounterTest.SpaceSavingConsumer[] mergedCounters = singleMerge(parallelCounters);
+
+        TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+        compareResult(mergedCounters[0], accurateCounter);
+        FileUtils.forceDelete(new File(dataFile));
+
+    }
+
+    private TopNCounterTest.SpaceSavingConsumer[] singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
 
-        for (int i = 0; i < topK_increment.size(); i++) {
-            assertEquals(topK_increment.get(i).getItem(),
-                    topK_single.get(i).getItem());
+        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+        
+        for (int i=0, n=consumers.length; i<n; i++) {
+            merged.vs.merge(consumers[i].vs);
         }
+
+        merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements;
+        return new TopNCounterTest.SpaceSavingConsumer[] {merged};
+        
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testCounterSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutput oo = new ObjectOutputStream(baos);
-            oo.writeObject(c);
-            oo.close();
-
-            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-            Counter<String> clone = (Counter<String>) oi.readObject();
-            assertEquals(c.getCount(), clone.getCount(), 0.0001);
-            assertEquals(c.getError(), clone.getError(), 0.0001);
-            assertEquals(c.getItem(), clone.getItem());
+    private TopNCounterTest.SpaceSavingConsumer[] binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        
+        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+            if (i + 1 < n) {
+                consumers[i].vs.merge(consumers[i + 1].vs);
+            }
+
+            list.add(consumers[i]);
         }
+
+        return binaryMerge(list.toArray(new TopNCounterTest.SpaceSavingConsumer[list.size()]));
     }
+    
 
+    private void feedDataToConsumer(String dataFile, TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+        long startTime = System.currentTimeMillis();
+        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
+        int lineNum = 0;
+        String line = bufferedReader.readLine();
+        while (line != null) {
+            if (lineNum >= startLine && lineNum < endLine) {
+                consumer.addElement(line, 1.0);
+            }
+            line = bufferedReader.readLine();
+            lineNum++;
         }
 
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutput oo = new ObjectOutputStream(baos);
-        oo.writeObject(vs);
-        oo.close();
+        bufferedReader.close();
+        outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+    }
 
-        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+    private void outputMsg(String msg) {
+        if (verbose)
+            System.out.println(msg);
+    }
 
-        assertEquals(vs.toString(), clone.toString());
+    private static interface TestDataConsumer {
+        public void addElement(String elementKey, double value);
+
+        public List<Pair<String, Double>> getTopN(int k);
+
+        public long getSpentTime();
     }
 
+    private class SpaceSavingConsumer implements TopNCounterTest.TestDataConsumer {
+        private long timeSpent = 0;
+        protected TopNCounter<String> vs;
+
+        public SpaceSavingConsumer(int space) {
+            vs = new TopNCounter<String>(space);
 
-    @Test
-    public void testByteSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
         }
 
-        testSerialization(vs);
+        public void addElement(String key, double value) {
+            //outputMsg("Adding " + key + ":" + incrementCount);
+            long startTime = System.currentTimeMillis();
+            vs.offer(key, value);
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Counter<String>> tops = vs.topK(k);
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
 
-        // Empty
-        vs = new TopNCounter<String>(0);
-        testSerialization(vs);
+            for (Counter<String> counter : tops)
+                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords;
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
     }
 
-    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
-        byte[] bytes = vs.toBytes();
-        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+    private class HashMapConsumer implements TopNCounterTest.TestDataConsumer {
+
+        private long timeSpent = 0;
+        private Map<String, Double> hashMap;
+
+        public HashMapConsumer() {
+            hashMap = Maps.newHashMap();
+        }
+
+        public void addElement(String key, double value) {
+            long startTime = System.currentTimeMillis();
+            if (hashMap.containsKey(key)) {
+                hashMap.put(key, hashMap.get(key) + value);
+            } else {
+                hashMap.put(key, value);
+            }
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+            }
 
-        assertEquals(vs.toString(), clone.toString());
+            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+                @Override
+                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+                }
+            });
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords.subList(0, k);
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
     }
+
 }



[6/7] incubator-kylin git commit: KYLIN-943 Update test case

Posted by sh...@apache.org.
KYLIN-943 Update test case


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/af4883e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/af4883e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/af4883e0

Branch: refs/heads/KYLIN-943
Commit: af4883e0185f71a239c7b7deb7609214a23b99af
Parents: f3fe0e5
Author: shaofengshi <sh...@apache.org>
Authored: Sun Sep 6 17:01:53 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../common/topn/TopNCounterCombinationTest.java |  2 ++
 .../serializer/TopNCounterSerializerTest.java   | 24 ++++++++++++++++++--
 .../kylin/storage/hybrid/HybridInstance.java    |  2 +-
 .../localmeta/project/default.json              |  5 ++++
 .../test_case_data/sandbox/kylin.properties     | 10 ++++----
 .../kylin/job/BuildCubeWithEngineTest.java      | 18 ++++++++++++++-
 6 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index cc0557e..3d809df 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -16,6 +16,7 @@
 
 package org.apache.kylin.common.topn;
 
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -23,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 @RunWith(Parameterized.class)
+@Ignore ("For collecting accuracy statistics, not for functional test")
 public class TopNCounterCombinationTest extends TopNCounterTest {
 
     @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
index 050193b..3c88446 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -3,6 +3,7 @@ package org.apache.kylin.metadata.measure.serializer;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.DataType;
 import org.junit.Assert;
 import org.junit.Test;
@@ -18,14 +19,14 @@ public class TopNCounterSerializerTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testCounterSerialization() {
+    public void testSerialization() {
         TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
         Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
         for (Integer i : stream) {
             vs.offer(new ByteArray(Bytes.toBytes(i)));
         }
 
-        ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+        ByteBuffer out = ByteBuffer.allocate(1024);
         serializer.serialize(vs, out);
         
         byte[] copyBytes = new byte[out.position()];
@@ -37,5 +38,24 @@ public class TopNCounterSerializerTest {
         Assert.assertEquals(vs.toString(), vsNew.toString());
 
     }
+    
+    @Test
+    public void testValueOf() {
+
+        TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10);
+        ByteArray key = new ByteArray(1);
+        ByteBuffer byteBuffer = key.asBuffer();
+        BytesUtil.writeVLong(20l, byteBuffer);
+        origin.offer(key, 1.0);
+
+        byteBuffer = ByteBuffer.allocate(1024);
+        byteBuffer.putInt(1);
+        byteBuffer.putInt(20);
+        byteBuffer.putDouble(1.0);
+        TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array());
+
+
+        Assert.assertEquals(origin.toString(), counter.toString());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index e9f0975..6ad27d5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -277,7 +277,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     }
 
     public IRealization getLatestRealization() {
-        if (realizations.length > 0) {
+        if (getRealizations().length > 0) {
             return realizations[realizations.length - 1];
         }
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/examples/test_case_data/localmeta/project/default.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/default.json b/examples/test_case_data/localmeta/project/default.json
index e4118ca..71eb1fa 100644
--- a/examples/test_case_data/localmeta/project/default.json
+++ b/examples/test_case_data/localmeta/project/default.json
@@ -23,6 +23,11 @@
       "realization": "test_kylin_cube_without_slr_left_join_empty"
     },
     {
+      "name": "test_kylin_cube_topn",
+      "type": "CUBE",
+      "realization": "test_kylin_cube_topn"
+    },
+    {
       "name": "test_kylin_ii_left_join",
       "type": "INVERTED_INDEX",
       "realization": "test_kylin_ii_left_join"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index beb9c44..5b05f87 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -16,16 +16,16 @@ kylin.job.mapreduce.default.reduce.input.mb=500
 
 # If true, job engine will not assume that hadoop CLI reside on the same server as it self
 # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
-kylin.job.run.as.remote.cmd=false
+kylin.job.run.as.remote.cmd=true
 
 # Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.hostname=
+kylin.job.remote.cli.hostname=sandbox
 
 # Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.username=
+kylin.job.remote.cli.username=root
 
 # Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.password=
+kylin.job.remote.cli.password=hadoop
 
 # Used by test cases to prepare synthetic data for sample cube
 kylin.job.remote.cli.working.dir=/tmp/kylin
@@ -34,7 +34,7 @@ kylin.job.remote.cli.working.dir=/tmp/kylin
 kylin.job.concurrent.max.limit=10
 
 # Whether calculate cube in mem in each mapper;
-kylin.job.cubing.inMem=true
+kylin.job.cubing.inMem=false
 
 #the percentage of the sampling, default 25%
 kylin.job.cubing.inMem.sampling.percent=25

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index d7eb3cf..7c6b028 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -125,7 +125,8 @@ public class BuildCubeWithEngineTest {
     }
 
     private void testInner() throws Exception {
-        String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
+       String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
+//        String[] testCase = new String[] { "testInnerJoinTopNCube" };
         runTestAndAssertSucceed(testCase);
     }
 
@@ -183,6 +184,21 @@ public class BuildCubeWithEngineTest {
         }
     }
 
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testInnerJoinTopNCube() throws Exception {
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 0;
+        long date2 = f.parse("2013-01-01").getTime();
+        long date3 = f.parse("2022-01-01").getTime();
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment("test_kylin_cube_topn", date1, date2));
+        result.add(buildSegment("test_kylin_cube_topn", date2, date3));
+        return result;
+    }
+
     @SuppressWarnings("unused")
     // called by reflection
     private List<String> testInnerJoinCube2() throws Exception {


[2/7] incubator-kylin git commit: KYLIN-943 initial commit for function and performance test

Posted by sh...@apache.org.
KYLIN-943 initial commit for function and performance test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7c501760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7c501760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7c501760

Branch: refs/heads/KYLIN-943
Commit: 7c501760050e0d9a9160442ecf87dca554ededfa
Parents: 4456bb1
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 10:05:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:20 2015 +0800

----------------------------------------------------------------------
 core-common/pom.xml                             |   4 +
 .../org/apache/kylin/common/topn/Counter.java   |  85 +++++
 .../kylin/common/topn/DoublyLinkedList.java     | 188 +++++++++++
 .../org/apache/kylin/common/topn/ITopK.java     |  53 +++
 .../org/apache/kylin/common/topn/ListNode2.java |  51 +++
 .../apache/kylin/common/topn/TopNCounter.java   | 336 +++++++++++++++++++
 .../common/topn/TopNCounterComparisonTest.java  | 281 ++++++++++++++++
 .../kylin/common/topn/TopNCounterTest.java      | 174 ++++++++++
 pom.xml                                         |   6 +
 9 files changed, 1178 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 577db42..f9e715d 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -65,6 +65,10 @@
             <artifactId>commons-email</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-httpclient</groupId>
             <artifactId>commons-httpclient</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
new file mode 100644
index 0000000..0f7d8de
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Modified from Counter.java in https://github.com/addthis/stream-lib
+ * 
+ * @param <T>
+ */
+public class Counter<T> implements Externalizable {
+
+    protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
+
+    protected T item;
+    protected double count;
+    protected double error;
+
+    /**
+     * For de-serialization
+     */
+    public Counter() {
+    }
+
+    public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
+        this.bucketNode = bucket;
+        this.count = 0;
+        this.error = 0;
+        this.item = item;
+    }
+
+    public T getItem() {
+        return item;
+    }
+
+    public double getCount() {
+        return count;
+    }
+
+    public double getError() {
+        return error;
+    }
+
+    @Override
+    public String toString() {
+        return item + ":" + count + ':' + error;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        item = (T) in.readObject();
+        count = in.readDouble();
+        error = in.readDouble();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(item);
+        out.writeDouble(count);
+        out.writeDouble(error);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
new file mode 100644
index 0000000..0942b84
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
+ * 
+ * @param <T>
+ */
+public class DoublyLinkedList<T> implements Iterable<T> {
+
+    protected int size;
+    protected ListNode2<T> tail;
+    protected ListNode2<T> head;
+
+    /**
+     * Append to head of list
+     */
+    public ListNode2<T> add(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+        if (size++ == 0) {
+            tail = node;
+        } else {
+            node.prev = head;
+            head.next = node;
+        }
+
+        head = node;
+
+        return node;
+    }
+
+    /**
+     * Prepend to tail of list
+     */
+    public ListNode2<T> enqueue(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+        if (size++ == 0) {
+            head = node;
+        } else {
+            node.next = tail;
+            tail.prev = node;
+        }
+
+        tail = node;
+
+        return node;
+    }
+
+    public void add(ListNode2<T> node) {
+        node.prev = head;
+        node.next = null;
+
+        if (size++ == 0) {
+            tail = node;
+        } else {
+            head.next = node;
+        }
+
+        head = node;
+    }
+
+    public ListNode2<T> addAfter(ListNode2<T> node, T value) {
+        ListNode2<T> newNode = new ListNode2<T>(value);
+        addAfter(node, newNode);
+        return newNode;
+    }
+
+    public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.next = node.next;
+        newNode.prev = node;
+        node.next = newNode;
+        if (newNode.next == null) {
+            head = newNode;
+        } else {
+            newNode.next.prev = newNode;
+        }
+        size++;
+    }
+
+    public void remove(ListNode2<T> node) {
+        if (node == tail) {
+            tail = node.next;
+        } else {
+            node.prev.next = node.next;
+        }
+
+        if (node == head) {
+            head = node.prev;
+        } else {
+            node.next.prev = node.prev;
+        }
+        size--;
+    }
+
+    public int size() {
+        return size;
+    }
+
+
+    @Override
+    public Iterator<T> iterator() {
+        return new DoublyLinkedListIterator(this);
+    }
+
+    protected class DoublyLinkedListIterator implements Iterator<T> {
+
+        protected DoublyLinkedList<T> list;
+        protected ListNode2<T> itr;
+        protected int length;
+
+        public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
+            this.length = list.size;
+            this.list = list;
+            this.itr = list.tail;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return itr != null;
+        }
+
+        @Override
+        public T next() {
+            if (length != list.size) {
+                throw new ConcurrentModificationException();
+            }
+            T next = itr.value;
+            itr = itr.next;
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    public T first() {
+        return tail == null ? null : tail.getValue();
+    }
+
+    public T last() {
+        return head == null ? null : head.getValue();
+    }
+
+    public ListNode2<T> head() {
+        return head;
+    }
+
+    public ListNode2<T> tail() {
+        return tail;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    public T[] toArray() {
+        T[] a = (T[]) new Object[size];
+        int i = 0;
+        for (T v : this) {
+            a[i++] = v;
+        }
+        return a;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
new file mode 100644
index 0000000..36603b7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.List;
+
+/**
+ * Modified from https://github.com/addthis/stream-lib
+ *  
+ * @param <T>
+ */
+public interface ITopK<T> {
+
+    /**
+     * offer a single element to the top.
+     *
+     * @param element - the element to add to the top
+     * @return false if the element was already in the top
+     */
+    boolean offer(T element);
+
+    /**
+     * offer a single element to the top and increment the count
+     * for that element by incrementCount.
+     *
+     * @param element        - the element to add to the top
+     * @param incrementCount - the increment count for the given count
+     * @return false if the element was already in the top
+     */
+    boolean offer(T element, double incrementCount);
+
+    /**
+     * @param k
+     * @return top k elements offered (may be an approximation)
+     */
+    List<T> peek(int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
new file mode 100644
index 0000000..92f5f57
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+/**
+ * Modified from ListNode2.java in https://github.com/addthis/stream-lib
+ *  
+ * @param <T>
+ */
+public class ListNode2<T> {
+
+    protected T value;
+    protected ListNode2<T> prev;
+    protected ListNode2<T> next;
+
+    public ListNode2(T value) {
+        this.value = value;
+    }
+
+    public ListNode2<T> getPrev() {
+        return prev;
+    }
+
+    public ListNode2<T> getNext() {
+        return next;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
new file mode 100644
index 0000000..0a45d0b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
+ * 
+ * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
+ * data structure as described in:
+ * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
+ * by Metwally, Agrawal, and Abbadi
+ *
+ * @param <T> type of data in the stream to be summarized
+ */
+public class TopNCounter<T> implements ITopK<T>, Externalizable {
+
+    protected class Bucket {
+
+        protected DoublyLinkedList<Counter<T>> counterList;
+
+        private double count;
+
+        public Bucket(double count) {
+            this.count = count;
+            this.counterList = new DoublyLinkedList<Counter<T>>();
+        }
+    }
+
+    protected int capacity;
+    private HashMap<T, ListNode2<Counter<T>>> counterMap;
+    protected DoublyLinkedList<Bucket> bucketList;
+
+    /**
+     * @param capacity maximum size (larger capacities improve accuracy)
+     */
+    public TopNCounter(int capacity) {
+        this.capacity = capacity;
+        counterMap = new HashMap<T, ListNode2<Counter<T>>>();
+        bucketList = new DoublyLinkedList<Bucket>();
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    @Override
+    public boolean offer(T item) {
+        return offer(item, 1.0);
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    @Override
+    public boolean offer(T item, double incrementCount) {
+        return offerReturnAll(item, incrementCount).getFirst();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return item dropped from summary if an item was dropped, null otherwise
+     */
+    public T offerReturnDropped(T item, int incrementCount) {
+        return offerReturnAll(item, incrementCount).getSecond();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
+     */
+    public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
+        ListNode2<Counter<T>> counterNode = counterMap.get(item);
+        boolean isNewItem = (counterNode == null);
+        T droppedItem = null;
+        if (isNewItem) {
+
+            if (size() < capacity) {
+                counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+            } else {
+                Bucket min = bucketList.first();
+                counterNode = min.counterList.tail();
+                Counter<T> counter = counterNode.getValue();
+                droppedItem = counter.item;
+                counterMap.remove(droppedItem);
+                counter.item = item;
+                counter.error = min.count;
+            }
+            counterMap.put(item, counterNode);
+        }
+
+        incrementCounter(counterNode, incrementCount);
+
+        return new Pair<Boolean, T>(isNewItem, droppedItem);
+    }
+
+    protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
+        Counter<T> counter = counterNode.getValue(); // count_i
+        ListNode2<Bucket> oldNode = counter.bucketNode;
+        Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
+        bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+        counter.count = counter.count + incrementCount;
+
+        // Finding the right bucket for count_i
+        // Because we allow a single call to increment count more than once, this may not be the adjacent bucket. 
+        ListNode2<Bucket> bucketNodePrev = oldNode;
+        ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+        while (bucketNodeNext != null) {
+            Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
+            if (counter.count == bucketNext.count) {
+                bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
+                break;
+            } else if (counter.count > bucketNext.count) {
+                bucketNodePrev = bucketNodeNext;
+                bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+            } else {
+                // A new bucket has to be created
+                bucketNodeNext = null;
+            }
+        }
+
+        if (bucketNodeNext == null) {
+            Bucket bucketNext = new Bucket(counter.count);
+            bucketNext.counterList.add(counterNode);
+            bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+        }
+        counter.bucketNode = bucketNodeNext;
+
+        //Cleaning up
+        if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
+        {
+            bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
+        }
+    }
+
+    @Override
+    public List<T> peek(int k) {
+        List<T> topK = new ArrayList<T>(k);
+
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                if (topK.size() == k) {
+                    return topK;
+                }
+                topK.add(c.item);
+            }
+        }
+
+        return topK;
+    }
+
+    public List<Counter<T>> topK(int k) {
+        List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
+
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                if (topK.size() == k) {
+                    return topK;
+                }
+                topK.add(c);
+            }
+        }
+
+        return topK;
+    }
+
+    /**
+     * @return number of items stored
+     */
+    public int size() {
+        return counterMap.size();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append('[');
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            sb.append('{');
+            sb.append(b.count);
+            sb.append(":[");
+            for (Counter<T> c : b.counterList) {
+                sb.append('{');
+                sb.append(c.item);
+                sb.append(':');
+                sb.append(c.error);
+                sb.append("},");
+            }
+            if (b.counterList.size() > 0) {
+                sb.deleteCharAt(sb.length() - 1);
+            }
+            sb.append("]},");
+        }
+        if (bucketList.size() > 0) {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.bucketList = new DoublyLinkedList<Bucket>();
+        this.capacity = in.readInt();
+
+        int size = in.readInt();
+        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+        Bucket currentBucket = null;
+        ListNode2<Bucket> currentBucketNode = null;
+        for (int i = 0; i < size; i++) {
+            Counter<T> c = (Counter<T>) in.readObject();
+            if (currentBucket == null || c.count != currentBucket.count) {
+                currentBucket = new Bucket(c.count);
+                currentBucketNode = bucketList.add(currentBucket);
+            }
+            c.bucketNode = currentBucketNode;
+            counterMap.put(c.item, currentBucket.counterList.add(c));
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(this.capacity);
+        out.writeInt(this.size());
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                out.writeObject(c);
+            }
+        }
+    }
+
+    /**
+     * For de-serialization
+     */
+    public TopNCounter() {
+    }
+
+    /**
+     * For de-serialization
+     *
+     * @param bytes
+     * @throws java.io.IOException
+     * @throws ClassNotFoundException
+     */
+    public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
+        fromBytes(bytes);
+    }
+
+    public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
+        readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
+    }
+
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(baos);
+        this.writeExternal(out);
+        out.flush();
+        return baos.toByteArray();
+
+    }
+
+    public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
+        TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
+        secondCounter.fromBytes(another.toBytes());
+        double m1 = 0.0, m2 = 0.0;
+        if (this.size() >= this.capacity) {
+            m1 = this.bucketList.tail().getValue().count;
+        }
+
+        if (secondCounter.size() >= secondCounter.capacity) {
+            m2 = secondCounter.bucketList.tail().getValue().count;
+        }
+
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+            T item = entry.getKey();
+            ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+            if (existing != null) {
+                this.offer(item, secondCounter.counterMap.get(item).getValue().count);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+
+                secondCounter.counterMap.remove(item);
+            } else {
+                this.offer(item, m2);
+                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
+            }
+        }
+
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+            T item = entry.getKey();
+            double counter = entry.getValue().getValue().count;
+            double error = entry.getValue().getValue().error;
+            this.offer(item, counter + m1);
+            this.counterMap.get(item).getValue().error = error + m1;
+        }
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
new file mode 100644
index 0000000..fb82522
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class TopNCounterComparisonTest {
+
+    private static final int TOP_K = 100;
+
+    private static final int KEY_SPACE = 100 * TOP_K;
+
+    private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
+
+    private static final int SPACE_SAVING_ROOM = 100;
+
+    @Before
+    public void setup() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    protected String prepareTestDate() throws IOException {
+        String[] allKeys = new String[KEY_SPACE];
+
+        for (int i = 0; i < KEY_SPACE; i++) {
+            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
+        }
+
+        System.out.println("Start to create test random data...");
+        long startTime = System.currentTimeMillis();
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        int keyIndex;
+
+        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+        if (tempFile.exists())
+            FileUtils.forceDelete(tempFile);
+        FileWriter fw = new FileWriter(tempFile);
+        try {
+            for (int i = 0; i < TOTAL_RECORDS; i++) {
+                keyIndex = zipf.sample();
+                fw.write(allKeys[keyIndex]);
+                fw.write('\n');
+            }
+        } finally {
+            if (fw != null)
+                fw.close();
+        }
+
+        System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+        System.out.println("Test data in : " + tempFile.getAbsolutePath());
+
+        return tempFile.getAbsolutePath();
+    }
+
+    //@Test
+    public void testCorrectness() throws IOException {
+        String dataFile = prepareTestDate();
+        TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
+        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+
+        for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
+        }
+
+        FileUtils.forceDelete(new File(dataFile));
+
+        compareResult(spaceSavingCounter, accurateCounter);
+    }
+
+    private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
+        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+        System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+        List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+        System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+        int error = 0;
+        for (int i = 0; i < topResult1.size(); i++) {
+            System.out.println("Compare " + i);
+
+            //            if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+            if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
+                    && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+                System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+            } else {
+                System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+                System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+                error++;
+            }
+        }
+
+        Assert.assertEquals(0, error);
+    }
+
+    @Test
+    public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+        String dataFile = prepareTestDate();
+
+        int PARALLEL = 10;
+        TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
+
+        for (int i = 0; i < PARALLEL; i++) {
+            parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
+        }
+
+        int slice = TOTAL_RECORDS / PARALLEL;
+        int startPosition = 0;
+        for (int i = 0; i < PARALLEL; i++) {
+            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+            startPosition += slice;
+        }
+
+        // merge counters
+
+        //        for (int i = 1; i < PARALLEL; i++) {
+        //            parallelCounters[0].vs.merge(parallelCounters[i].vs);
+        //        }
+
+        TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
+
+        TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+        compareResult(mergedCounters[0], accurateCounter);
+        FileUtils.forceDelete(new File(dataFile));
+
+    }
+
+    private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+        List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+            if (i + 1 < n) {
+                consumers[i].vs.merge(consumers[i + 1].vs);
+            }
+
+            list.add(consumers[i]);
+        }
+
+        return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
+    }
+
+    private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+        long startTime = System.currentTimeMillis();
+        BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
+
+        int lineNum = 0;
+        String line = bufferedReader.readLine();
+        while (line != null) {
+            if (lineNum >= startLine && lineNum < endLine) {
+                consumer.addElement(line, 1.0);
+            }
+            line = bufferedReader.readLine();
+            lineNum++;
+        }
+
+        bufferedReader.close();
+        System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+    }
+
+    private static interface TestDataConsumer {
+        public void addElement(String elementKey, double value);
+
+        public List<Pair<String, Double>> getTopN(int k);
+
+        public long getSpentTime();
+    }
+
+    private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+        private long timeSpent = 0;
+        protected TopNCounter<String> vs;
+
+        public SpaceSavingConsumer() {
+            vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
+
+        }
+
+        public void addElement(String key, double value) {
+            //System.out.println("Adding " + key + ":" + incrementCount);
+            long startTime = System.currentTimeMillis();
+            vs.offer(key, value);
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Counter<String>> tops = vs.topK(k);
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Counter<String> counter : tops)
+                allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords;
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+    private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+
+        private long timeSpent = 0;
+        private Map<String, Double> hashMap;
+
+        public HashMapConsumer() {
+            hashMap = Maps.newHashMap();
+        }
+
+        public void addElement(String key, double value) {
+            long startTime = System.currentTimeMillis();
+            if (hashMap.containsKey(key)) {
+                hashMap.put(key, hashMap.get(key) + value);
+            } else {
+                hashMap.put(key, value);
+            }
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+                allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+            }
+
+            Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+                @Override
+                public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+                    return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+                }
+            });
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords.subList(0, k);
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
new file mode 100644
index 0000000..23620d1
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterTest {
+
+    private static final int NUM_ITERATIONS = 100000;
+
+    @Test
+    public void testTopNCounter() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        for (String i : stream) {
+                vs.offer(i);
+            /*
+        for(String s : vs.poll(3))
+        System.out.print(s+" ");
+             */
+            System.out.println(vs);
+        }
+
+        List<Counter<String>> topk = vs.topK(6);
+        
+        for(Counter<String> top : topk) {
+            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+        }
+        
+    }
+
+    @Test
+    public void testTopK() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrement() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i, 10);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrementOutOfOrder() {
+        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+        TopNCounter<String> vs_single = new TopNCounter<String>(3);
+        String[] stream = {"A", "B", "C", "D", "A"};
+        Integer[] increments = {15, 20, 25, 30, 1};
+
+        for (int i = 0; i < stream.length; i++) {
+            vs_increment.offer(stream[i], increments[i]);
+            for (int k = 0; k < increments[i]; k++) {
+                vs_single.offer(stream[i]);
+            }
+        }
+        System.out.println("Insert with counts vs. single inserts:");
+        System.out.println(vs_increment);
+        System.out.println(vs_single);
+
+        List<Counter<String>> topK_increment = vs_increment.topK(3);
+        List<Counter<String>> topK_single = vs_single.topK(3);
+
+        for (int i = 0; i < topK_increment.size(); i++) {
+            assertEquals(topK_increment.get(i).getItem(),
+                    topK_single.get(i).getItem());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutput oo = new ObjectOutputStream(baos);
+            oo.writeObject(c);
+            oo.close();
+
+            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+            Counter<String> clone = (Counter<String>) oi.readObject();
+            assertEquals(c.getCount(), clone.getCount(), 0.0001);
+            assertEquals(c.getError(), clone.getError(), 0.0001);
+            assertEquals(c.getItem(), clone.getItem());
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutput oo = new ObjectOutputStream(baos);
+        oo.writeObject(vs);
+        oo.close();
+
+        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+
+
+    @Test
+    public void testByteSerialization() throws IOException, ClassNotFoundException {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        testSerialization(vs);
+
+        // Empty
+        vs = new TopNCounter<String>(0);
+        testSerialization(vs);
+    }
+
+    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+        byte[] bytes = vs.toBytes();
+        TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+        assertEquals(vs.toString(), clone.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 360c6b1..051df4d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
         <commons-httpclient.version>3.1</commons-httpclient.version>
         <commons-collections4.version>4.0</commons-collections4.version>
         <commons-email.version>1.1</commons-email.version>
+        <commons-math3.version>3.6-SNAPSHOT</commons-math3.version>
 
         <!-- Spark -->
         <spark.version>1.3.0</spark.version>
@@ -386,6 +387,11 @@
                 <version>${commons-email.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-math3</artifactId>
+                <version>${commons-math3.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>



[3/7] incubator-kylin git commit: KYLIN-943 Update TopNCounter serialization to ascending order

Posted by sh...@apache.org.
KYLIN-943 Update TopNCounter serialization to ascending order


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f3fe0e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f3fe0e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f3fe0e50

Branch: refs/heads/KYLIN-943
Commit: f3fe0e509fdf6a87baeb917b7588f390064afa6f
Parents: 6208520
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 21:02:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   |  78 +++----------
 .../kylin/common/topn/TopNCounterBasicTest.java | 110 +++----------------
 .../serializer/TopNCounterSerializerTest.java   |  41 +++++++
 3 files changed, 71 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 51fe1b2..69e8d56 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -38,7 +38,7 @@ import java.util.Map;
  *
  * @param <T> type of data in the stream to be summarized
  */
-public class TopNCounter<T> implements ITopK<T>, Externalizable {
+public class TopNCounter<T> implements ITopK<T> {
     
     public static final int EXTRA_SPACE_RATE = 50;
 
@@ -101,7 +101,7 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
      * @param item stream element (<i>e</i>)
      * @return item dropped from summary if an item was dropped, null otherwise
      */
-    public T offerReturnDropped(T item, int incrementCount) {
+    public T offerReturnDropped(T item, double incrementCount) {
         return offerReturnAll(item, incrementCount).getSecond();
     }
 
@@ -241,28 +241,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         return sb.toString();
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        this.bucketList = new DoublyLinkedList<Bucket>();
-        this.capacity = in.readInt();
-
-        int size = in.readInt();
-        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
-        Bucket currentBucket = null;
-        ListNode2<Bucket> currentBucketNode = null;
-        for (int i = 0; i < size; i++) {
-            Counter<T> c = (Counter<T>) in.readObject();
-            if (currentBucket == null || c.count != currentBucket.count) {
-                currentBucket = new Bucket(c.count);
-                currentBucketNode = bucketList.add(currentBucket);
-            }
-            c.bucketNode = currentBucketNode;
-            counterMap.put(c.item, currentBucket.counterList.add(c));
-        }
-    }
-
     public void fromExternal(int size, double[] counters, List<T> items) {
         this.bucketList = new DoublyLinkedList<Bucket>();
 
@@ -283,18 +261,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
     }
 
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(this.capacity);
-        out.writeInt(this.size());
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                out.writeObject(c);
-            }
-        }
-    }
-
     /**
      * For de-serialization
      */
@@ -302,30 +268,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
     }
 
     /**
-     * For de-serialization
-     *
-     * @param bytes
-     * @throws java.io.IOException
-     * @throws ClassNotFoundException
-     */
-    public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
-        fromBytes(bytes);
-    }
-
-    public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
-        readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
-    }
-
-    public byte[] toBytes() throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream out = new ObjectOutputStream(baos);
-        this.writeExternal(out);
-        out.flush();
-        return baos.toByteArray();
-
-    }
-
-    /**
      * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
      * @param another
      * @return
@@ -387,12 +329,16 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
 
     }
-    
+
+    /**
+     * Get the counter values in ascending order
+     * @return
+     */
     public double[] getCounters() {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
             Bucket b = bNode.getValue();
             for (Counter<T> c : b.counterList) {
                 counters[index] = c.count;
@@ -402,10 +348,14 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
         return counters;
     }
-    
+
+    /**
+     * Get the item list order by counter values in ascending order
+     * @return
+     */
     public List<T> getItems() {
         List<T> items = Lists.newArrayList();
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
             Bucket b = bNode.getValue();
             for (Counter<T> c : b.counterList) {
                 items.add(c.item);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index e25f651..252e955 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,45 +16,36 @@
 
 package org.apache.kylin.common.topn;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.junit.Test;
 
-import java.io.*;
 import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-
 public class TopNCounterBasicTest {
 
     @Test
     public void testTopNCounter() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y" };
         for (String i : stream) {
-                vs.offer(i);
-            /*
-        for(String s : vs.poll(3))
-        System.out.print(s+" ");
-             */
-            System.out.println(vs);
+            vs.offer(i);
         }
 
         List<Counter<String>> topk = vs.topK(6);
-        
-        for(Counter<String> top : topk) {
+
+        for (Counter<String> top : topk) {
             System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
         }
-        
+
     }
 
     @Test
     public void testTopK() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);
         }
@@ -67,7 +58,7 @@ public class TopNCounterBasicTest {
     @Test
     public void testTopKWithIncrement() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i, 10);
         }
@@ -81,8 +72,8 @@ public class TopNCounterBasicTest {
     public void testTopKWithIncrementOutOfOrder() {
         TopNCounter<String> vs_increment = new TopNCounter<String>(3);
         TopNCounter<String> vs_single = new TopNCounter<String>(3);
-        String[] stream = {"A", "B", "C", "D", "A"};
-        Integer[] increments = {15, 20, 25, 30, 1};
+        String[] stream = { "A", "B", "C", "D", "A" };
+        Integer[] increments = { 15, 20, 25, 30, 1 };
 
         for (int i = 0; i < stream.length; i++) {
             vs_increment.offer(stream[i], increments[i]);
@@ -98,102 +89,33 @@ public class TopNCounterBasicTest {
         List<Counter<String>> topK_single = vs_single.topK(3);
 
         for (int i = 0; i < topK_increment.size(); i++) {
-            assertEquals(topK_increment.get(i).getItem(),
-                    topK_single.get(i).getItem());
+            assertEquals(topK_increment.get(i).getItem(), topK_single.get(i).getItem());
         }
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testCounterSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutput oo = new ObjectOutputStream(baos);
-            oo.writeObject(c);
-            oo.close();
-
-            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-            Counter<String> clone = (Counter<String>) oi.readObject();
-            assertEquals(c.getCount(), clone.getCount(), 0.0001);
-            assertEquals(c.getError(), clone.getError(), 0.0001);
-            assertEquals(c.getItem(), clone.getItem());
-        }
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutput oo = new ObjectOutputStream(baos);
-        oo.writeObject(vs);
-        oo.close();
-
-        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
-
-        assertEquals(vs.toString(), clone.toString());
-    }
-
-
-    @Test
-    public void testByteSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-
-        testSerialization(vs);
-
-        // Empty
-        vs = new TopNCounter<String>(0);
-        testSerialization(vs);
-    }
-
-    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
-        byte[] bytes = vs.toBytes();
-        TopNCounter<String> clone = new TopNCounter<String>(bytes);
-
-        assertEquals(vs.toString(), clone.toString());
-    }
-    
     @Test
     public void testRetain() {
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);
         }
-        
+
         vs.retain(5);
         assertTrue(vs.size() <= 5);
         assertTrue(vs.getCapacity() <= 5);
     }
-    
+
     @Test
     public void testMerge() {
 
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
         for (String i : stream) {
             vs.offer(i);
         }
 
-
-        String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+        String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" };
         TopNCounter<String> vs2 = new TopNCounter<String>(10);
         for (String i : stream2) {
             vs2.offer(i);
@@ -204,6 +126,6 @@ public class TopNCounterBasicTest {
         for (Counter<String> c : topK) {
             assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
         }
-        
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..050193b
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ */
+public class TopNCounterSerializerTest {
+
+    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)"));
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() {
+        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+        for (Integer i : stream) {
+            vs.offer(new ByteArray(Bytes.toBytes(i)));
+        }
+
+        ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+        serializer.serialize(vs, out);
+        
+        byte[] copyBytes = new byte[out.position()];
+        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+        ByteBuffer in = ByteBuffer.wrap(copyBytes);
+        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+        Assert.assertEquals(vs.toString(), vsNew.toString());
+
+    }
+
+}


[5/7] incubator-kylin git commit: KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN

Posted by sh...@apache.org.
KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/196a537f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/196a537f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/196a537f

Branch: refs/heads/KYLIN-943
Commit: 196a537f8cd0739f45971fc86e2d3c5bdad8accb
Parents: 5980d95
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 26 17:47:38 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/topn/Counter.java   |  4 +-
 .../kylin/common/topn/TopNCounterBasicTest.java | 39 ++++++++++++-
 .../common/topn/TopNCounterCombinationTest.java | 38 ++++++------
 .../kylin/common/topn/TopNCounterTest.java      | 10 ++--
 .../metadata/measure/MeasureAggregator.java     |  2 +
 .../kylin/metadata/measure/TopNAggregator.java  | 60 +++++++++++++++++++
 .../serializer/TopNCounterSerializer.java       | 61 ++++++++++++++++++++
 .../apache/kylin/metadata/model/DataType.java   |  8 ++-
 .../kylin/metadata/model/FunctionDesc.java      |  9 ++-
 .../kylin/metadata/model/ParameterDesc.java     | 54 ++++++++---------
 10 files changed, 226 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 0f7d8de..866d3d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -72,14 +72,14 @@ public class Counter<T> implements Externalizable {
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         item = (T) in.readObject();
         count = in.readDouble();
-        error = in.readDouble();
+        //error = in.readDouble();
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(item);
         out.writeDouble(count);
-        out.writeDouble(error);
+        //out.writeDouble(error);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 0d59314..e25f651 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -30,8 +30,6 @@ import static org.junit.Assert.assertTrue;
 
 public class TopNCounterBasicTest {
 
-    private static final int NUM_ITERATIONS = 100000;
-
     @Test
     public void testTopNCounter() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
@@ -171,4 +169,41 @@ public class TopNCounterBasicTest {
 
         assertEquals(vs.toString(), clone.toString());
     }
+    
+    @Test
+    public void testRetain() {
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        
+        vs.retain(5);
+        assertTrue(vs.size() <= 5);
+        assertTrue(vs.getCapacity() <= 5);
+    }
+    
+    @Test
+    public void testMerge() {
+
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+
+        String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+        TopNCounter<String> vs2 = new TopNCounter<String>(10);
+        for (String i : stream2) {
+            vs2.offer(i);
+        }
+        // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0
+        vs.merge(vs2);
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
+        }
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index afde8f7..cc0557e 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,37 +29,33 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
     public static Collection<Integer[]> configs() {
         //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
         return Arrays.asList(new Integer[][] {
-                /*
                 // with 20X space
-                { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
-                
-                */
+                { 10, 20 }, // top 10%
+                { 20, 20 }, // top 5%
+                { 100, 20 }, // top 1%
+                { 1000, 20 }, // top 0.1%
+
                 // with 50X space
-                { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+                { 10, 50 }, // top 10% 
+                { 20, 50 }, // top 5% 
+                { 100, 50 }, // top 1% 
+                { 1000, 50 }, // top 0.1%
 
-                /*
                 // with 100X space
-                { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
-                */
+                { 10, 100 }, // top 10% 
+                { 20, 100 }, // top 5% 
+                { 100, 100 }, // top 1% 
+                { 1000, 100 }, // top 0.1% 
         });
     }
 
-    public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+    public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) throws Exception {
         super();
-        this.TOP_K = topK;
+        this.TOP_K = 100;
         this.KEY_SPACE = TOP_K * keySpaceRate;
         this.SPACE_SAVING_ROOM = spaceSavingRate;
         TOTAL_RECORDS = 1000000; // 1 million
-        this.PARALLEL = 50;
-        this.verbose = false;
+        this.PARALLEL = 10;
+        this.verbose = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index e5350ba..edc9ce2 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -41,7 +41,7 @@ public class TopNCounterTest {
     protected static int SPACE_SAVING_ROOM;
 
     protected static int PARALLEL = 10;
-
+    
     protected static boolean verbose = true;
 
     public TopNCounterTest() {
@@ -60,7 +60,7 @@ public class TopNCounterTest {
 
         outputMsg("Start to create test random data...");
         long startTime = System.currentTimeMillis();
-        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5);
         int keyIndex;
 
         File tempFile = File.createTempFile("ZipfDistribution", ".txt");
@@ -70,7 +70,7 @@ public class TopNCounterTest {
         FileWriter fw = new FileWriter(tempFile);
         try {
             for (int i = 0; i < TOTAL_RECORDS; i++) {
-                keyIndex = zipf.sample();
+                keyIndex = zipf.sample() -1;
                 fw.write(allKeys[keyIndex]);
                 fw.write('\n');
             }
@@ -85,7 +85,7 @@ public class TopNCounterTest {
         return tempFile.getAbsolutePath();
     }
 
-    @Test
+    //@Test
     public void testSingleSpaceSaving() throws IOException {
         String dataFile = prepareTestDate();
         TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
@@ -163,7 +163,7 @@ public class TopNCounterTest {
         if (consumers.length == 1)
             return consumers;
 
-        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
         
         for (int i=0, n=consumers.length; i<n; i++) {
             merged.vs.merge(consumers[i].vs);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index 4153cbd..b533044 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -55,6 +55,8 @@ abstract public class MeasureAggregator<V> {
                 return new BigDecimalMinAggregator();
             else if (isDouble(returnType))
                 return new DoubleMinAggregator();
+        } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+            return new TopNAggregator();
         }
         throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
new file mode 100644
index 0000000..9cea4cc
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+/**
+ * 
+ */
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+    int capacity = 0;
+    TopNCounter<ByteArray> sum = null;
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(TopNCounter<ByteArray> value) {
+        if (sum == null) {
+            sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
+            capacity = value.getCapacity();
+        }
+
+        sum.merge(value);
+    }
+
+    @Override
+    public TopNCounter getState() {
+        
+        sum.retain(capacity);
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 * capacity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
new file mode 100644
index 0000000..1c6c442
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 0;
+    }
+
+    @Override
+    public int maxLength() {
+        return 0;
+    }
+
+    @Override
+    public TopNCounter<ByteArray> valueOf(byte[] value) {
+        return null;
+    }
+
+    @Override
+    public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+
+    }
+
+    @Override
+    public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 9a89499..74ae1d4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -35,7 +35,7 @@ public class DataType {
 
     public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
             + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc" //
+            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn" //
             + "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
             + "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
 
@@ -183,6 +183,8 @@ public class DataType {
             return 8;
         } else if (isHLLC()) {
             return 1 << precision;
+        } else if (isTopN()) {
+            return 8 * precision * 50;
         }
         throw new IllegalStateException("The return type : " + name + " is not recognized;");
     }
@@ -250,6 +252,10 @@ public class DataType {
     public boolean isHLLC() {
         return name.equals("hllc");
     }
+    
+    public boolean isTopN() {
+        return name.equals("topn");
+    }
 
     public String getName() {
         return name;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e3f4c48..b87d50c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -34,10 +34,11 @@ public class FunctionDesc {
     public static final String FUNC_MAX = "MAX";
     public static final String FUNC_COUNT = "COUNT";
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_TOP_N = "TOP_N";
 
     public static final String PARAMTER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
-
+    
     @JsonProperty("expression")
     private String expression;
     @JsonProperty("parameter")
@@ -92,6 +93,10 @@ public class FunctionDesc {
         return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
     }
 
+    public boolean isTopN() {
+        return FUNC_TOP_N.equalsIgnoreCase(expression);
+    }
+
     public boolean isHolisticCountDistinct() {
         if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) {
             return true;
@@ -138,7 +143,7 @@ public class FunctionDesc {
     }
 
     public DataType getSQLType() {
-        if (isCountDistinct())
+        if (isCountDistinct() || isTopN())
             return DataType.ANY;
         else if (isSum() || isMax() || isMin())
             return parameter.getColRefs().get(0).getType();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 1565862..bdb4b91 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -39,6 +39,9 @@ public class ParameterDesc {
     private String type;
     @JsonProperty("value")
     private String value;
+    
+    @JsonProperty("counter")
+    private String counter;
 
     private List<TblColRef> colRefs;
 
@@ -62,6 +65,14 @@ public class ParameterDesc {
         this.value = value;
     }
 
+    public String getCounter() {
+        return counter;
+    }
+
+    public void setCounter(String counter) {
+        this.counter = counter;
+    }
+
     public List<TblColRef> getColRefs() {
         return colRefs;
     }
@@ -85,39 +96,30 @@ public class ParameterDesc {
     }
 
     @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
-        result = prime * result + ((value == null) ? 0 : value.hashCode());
-        return result;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ParameterDesc that = (ParameterDesc) o;
+
+        if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+        if (type != null ? !type.equals(that.type) : that.type != null) return false;
+        if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+        return true;
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        ParameterDesc other = (ParameterDesc) obj;
-        if (type == null) {
-            if (other.type != null)
-                return false;
-        } else if (!type.equals(other.type))
-            return false;
-        if (value == null) {
-            if (other.value != null)
-                return false;
-        } else if (!value.equals(other.value))
-            return false;
-        return true;
+    public int hashCode() {
+        int result = type != null ? type.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (counter != null ? counter.hashCode() : 0);
+        return result;
     }
 
     @Override
     public String toString() {
-        return "ParameterDesc [type=" + type + ", value=" + value + "]";
+        return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
     }
 
 }


[4/7] incubator-kylin git commit: KYLIN-943 update query/storage engine to support TopN

Posted by sh...@apache.org.
KYLIN-943 update query/storage engine to support TopN


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/38201be2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/38201be2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/38201be2

Branch: refs/heads/KYLIN-943
Commit: 38201be298772dae4f2a72d2cc463dc7b857f090
Parents: af4883e
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 17 14:05:06 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   | 55 ++++++++++--
 .../kylin/cube/CubeCapabilityChecker.java       | 60 +++++++++++--
 .../apache/kylin/cube/CubeDimensionDeriver.java | 10 ++-
 .../org/apache/kylin/cube/model/CubeDesc.java   |  6 +-
 .../kylin/metadata/model/FunctionDesc.java      | 14 ++-
 .../kylin/metadata/realization/SQLDigest.java   | 13 ++-
 .../apache/kylin/storage/StorageContext.java    | 15 +---
 .../kylin/storage/hybrid/HybridInstance.java    |  4 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |  3 +
 .../kylin/storage/cache/DynamicCacheTest.java   |  4 +-
 .../kylin/storage/cache/StaticCacheTest.java    |  4 +-
 .../cube_desc/test_kylin_cube_topn_desc.json    |  2 +-
 .../localmeta/data/DEFAULT.STREAMING_TABLE.csv  |  0
 .../test_kylin_inner_join_model_desc.json       |  3 +-
 .../test_kylin_left_join_model_desc.json        |  3 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |  6 +-
 .../apache/kylin/query/relnode/OLAPContext.java | 18 +++-
 .../kylin/query/relnode/OLAPLimitRel.java       |  1 +
 .../apache/kylin/query/relnode/OLAPSortRel.java | 11 +--
 .../kylin/query/test/ITKylinQueryTest.java      |  2 +-
 query/src/test/resources/query/sql/query81.sql  | 26 ++++++
 query/src/test/resources/query/sql/query82.sql  | 26 ++++++
 .../cube/v1/CubeSegmentTopNTupleIterator.java   | 86 ++++++++++++++++++
 .../hbase/cube/v1/CubeSegmentTupleIterator.java | 35 ++++----
 .../storage/hbase/cube/v1/CubeStorageQuery.java | 92 +++++++++++++++-----
 .../hbase/cube/v1/CubeTupleConverter.java       | 71 ++++++++++++++-
 .../cube/v1/SerializedHBaseTupleIterator.java   |  9 +-
 .../storage/hbase/common/ITStorageTest.java     |  4 +-
 28 files changed, 496 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 69e8d56..6814b8d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -23,10 +23,7 @@ import org.apache.kylin.common.util.Pair;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
@@ -38,10 +35,11 @@ import java.util.Map;
  *
  * @param <T> type of data in the stream to be summarized
  */
-public class TopNCounter<T> implements ITopK<T> {
+public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     
     public static final int EXTRA_SPACE_RATE = 50;
 
+
     protected class Bucket {
 
         protected DoublyLinkedList<Counter<T>> counterList;
@@ -365,4 +363,51 @@ public class TopNCounter<T> implements ITopK<T> {
         return items;
 
     }
+
+    @Override
+    public Iterator<Counter<T>> iterator() {
+        return new TopNCounterIterator();
+    }
+    
+    private class TopNCounterIterator implements Iterator {
+
+        private ListNode2<Bucket> currentBNode;
+        private Iterator<Counter<T>> currentCounterIterator;
+        
+        private TopNCounterIterator() {
+            currentBNode = bucketList.head();
+            if (currentBNode != null && currentBNode.getValue() != null) {
+                currentCounterIterator = currentBNode.getValue().counterList.iterator();
+            }
+        }
+        
+        @Override
+        public boolean hasNext() {
+            if (currentCounterIterator == null) {
+                return false;
+            }
+            
+            if (currentCounterIterator.hasNext()) {
+                return true;
+            }
+
+            currentBNode = currentBNode.getPrev();
+            
+            if (currentBNode == null)
+                return false;
+
+            currentCounterIterator = currentBNode.getValue().counterList.iterator();
+            return hasNext();
+        }
+
+        @Override
+        public Counter<T> next() {
+            return currentCounterIterator.next();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 77c3298..628340e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,19 +18,20 @@
 
 package org.apache.kylin.cube;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 /**
  */
 public class CubeCapabilityChecker {
@@ -39,10 +40,11 @@ public class CubeCapabilityChecker {
     public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
 
         // retrieve members from olapContext
-        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest.groupbyColumns, digest.filterColumns);
+        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
         Collection<FunctionDesc> functions = digest.aggregations;
         Collection<TblColRef> metricsColumns = digest.metricColumns;
         Collection<JoinDesc> joins = digest.joinDescs;
+        boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
 
         // match dimensions & aggregations & joins
 
@@ -62,6 +64,13 @@ public class CubeCapabilityChecker {
             }
         }
 
+        // for topn, the group column can come from measure
+        if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
+            boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
+            matchDimensions = matchedTopN;
+            matchAggregation = matchedTopN;
+        }
+
         if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
             logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
             return false;
@@ -70,6 +79,47 @@ public class CubeCapabilityChecker {
         return true;
     }
 
+    private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        List<FunctionDesc> cubeFunctions = cubeDesc.listAllFunctions();
+        Collection<FunctionDesc> functions = digest.aggregations;
+        Collection<MeasureDesc> sortMeasures = digest.sortMeasures;
+        Collection<SQLDigest.OrderEnum> sortOrders = digest.sortOrders;
+
+        FunctionDesc onlyFunction = functions.iterator().next();
+        if (onlyFunction.isSum() == false) {
+            // topN only support SUM expression
+            return false;
+        }
+
+        Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().isTopN()) {
+                List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
+                TblColRef displayCol = cols.get(cols.size() - 1);
+                dimensionColumnsCopy.remove(displayCol);
+                if(isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
+                    if (measure.getFunction().isCompatible(onlyFunction)) {
+                        return true;
+                    }
+                }
+                dimensionColumnsCopy.add(displayCol);
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN())
+                return true;
+        }
+
+        return false;
+    }
+
     private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
         CubeDesc cubeDesc = cube.getDescriptor();
         boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
index a746c99..138d01e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
@@ -21,7 +21,9 @@ package org.apache.kylin.cube;
 import java.util.Collection;
 import java.util.HashSet;
 
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 /**
  *
@@ -29,7 +31,13 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class CubeDimensionDeriver {
 
-    public static Collection<TblColRef> getDimensionColumns(Collection<TblColRef> groupByColumns, Collection<TblColRef> filterColumns) {
+    public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+        Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+        Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+        Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
+        Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
+                
         Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
         dimensionColumns.addAll(groupByColumns);
         dimensionColumns.addAll(filterColumns);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 90b5474..9cbdfae 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
     private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+
     private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
     private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
     private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -827,5 +828,8 @@ public class CubeDesc extends RootPersistentEntity {
         }
         return result;
     }
-    
+
+    public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
+        return measureDisplayColumns;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index b87d50c..d10f395 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -60,7 +60,7 @@ public class FunctionDesc {
     }
 
     public boolean needRewrite() {
-        return !isSum() && !isDimensionAsMetric();
+        return !isSum() && !isDimensionAsMetric() && !isTopN();
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -225,4 +225,16 @@ public class FunctionDesc {
         return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
     }
 
+    public boolean isCompatible(FunctionDesc another) {
+        if (another == null) {
+            return false;
+        }
+
+        if (this.isTopN() && another.isSum()) {
+            if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+                return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 7811858..e48cebe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,15 +19,22 @@
 package org.apache.kylin.metadata.realization;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
 public class SQLDigest {
+
+    public enum OrderEnum {
+        ASCENDING, DESCENDING
+    }
+
     public String factTable;
     public TupleFilter filter;
     public Collection<JoinDesc> joinDescs;
@@ -36,9 +43,11 @@ public class SQLDigest {
     public Collection<TblColRef> filterColumns;
     public Collection<TblColRef> metricColumns;
     public Collection<FunctionDesc> aggregations;
+    public Collection<MeasureDesc> sortMeasures;
+    public Collection<OrderEnum> sortOrders;
 
     public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
-            Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc) {
+            Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
         this.factTable = factTable;
         this.filter = filter;
         this.joinDescs = joinDescs;
@@ -47,6 +56,8 @@ public class SQLDigest {
         this.filterColumns = filterColumns;
         this.metricColumns = aggregatedColumns;
         this.aggregations = aggregateFunnc;
+        this.sortMeasures = sortMeasures;
+        this.sortOrders = sortOrders;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 8b1b706..1643aa4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 /**
  * @author xjiang
@@ -32,17 +33,12 @@ public class StorageContext {
 
     public static final int DEFAULT_THRESHOLD = 1000000;
 
-    public enum OrderEnum {
-        ASCENDING, DESCENDING
-    }
 
     private String connUrl;
     private int threshold;
     private int limit;
     private int offset;
     private boolean hasSort;
-    private List<MeasureDesc> sortMeasures;
-    private List<OrderEnum> sortOrders;
     private boolean acceptPartialResult;
 
     private boolean exactAggregation;
@@ -59,8 +55,6 @@ public class StorageContext {
         this.totalScanCount = new AtomicLong();
         this.cuboid = null;
         this.hasSort = false;
-        this.sortOrders = new ArrayList<OrderEnum>();
-        this.sortMeasures = new ArrayList<MeasureDesc>();
 
         this.exactAggregation = false;
         this.enableLimit = false;
@@ -110,13 +104,6 @@ public class StorageContext {
         return this.enableLimit;
     }
 
-    public void addSort(MeasureDesc measure, OrderEnum order) {
-        if (measure != null) {
-            sortMeasures.add(measure);
-            sortOrders.add(order);
-        }
-    }
-
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 6ad27d5..0c30a3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -180,7 +180,9 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
 
     @Override
     public DataModelDesc getDataModelDesc() {
-        return this.getLatestRealization().getDataModelDesc();
+        if (this.getLatestRealization() != null)
+            return this.getLatestRealization().getDataModelDesc();
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 0ffae69..11b03bd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -25,6 +25,7 @@ import java.util.List;
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.measure.LongMutable;
@@ -65,6 +66,8 @@ public class Tuple implements ITuple {
                 ret.values[i] = null;
             } else if (this.values[i] instanceof HyperLogLogPlusCounter) {
                 ret.values[i] = new HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]);
+            } else if (this.values[i] instanceof TopNCounter) {
+                ret.values[i] = null;
             } else {
                 ret.values[i] = this.values[i];
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 309d67f..161cad6 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.cache;
 
+import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -8,6 +9,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.IdentityUtils;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -76,7 +78,7 @@ public class DynamicCacheTest {
         final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
         final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
 
-        SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations);
+        SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
 
         ITuple aTuple = new TsOnlyTuple(partitionCol, "2011-02-01");
         ITuple bTuple = new TsOnlyTuple(partitionCol, "2012-02-01");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index e54e3e0..48b0b1d 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.cache;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -8,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.util.IdentityUtils;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -34,7 +36,7 @@ public class StaticCacheTest {
         final List<TblColRef> groups = StorageMockUtils.buildGroups();
         final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
         final TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
-        final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+        final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
         final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
 
         final List<ITuple> ret = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
index 2e0e376..96c3ace 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -111,7 +111,7 @@
     ]
   },
   "last_modified": 1422435345330,
-  "model_name": "test_kylin_inner_join_model_desc",
+  "model_name": "test_kylin_left_join_model_desc",
   "null_string": null,
   "hbase_mapping": {
     "column_family": [

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
index 86f8169..a28684f 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
@@ -36,7 +36,8 @@
       "columns": [
         "lstg_format_name",
         "LSTG_SITE_ID",
-        "SLR_SEGMENT_CD"
+        "SLR_SEGMENT_CD",
+        "seller_id"
       ]
     },
     {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
index d05a08f..c26ffc5 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
@@ -47,7 +47,8 @@
       "columns": [
         "lstg_format_name",
         "LSTG_SITE_ID",
-        "SLR_SEGMENT_CD"
+        "SLR_SEGMENT_CD",
+        "seller_id"
       ]
     },
     {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 7c6b028..4564ccc 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -120,13 +120,13 @@ public class BuildCubeWithEngineTest {
     @Test
     public void test() throws Exception {
         DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
-        testInner();
+//        testInner();
         testLeft();
     }
 
     private void testInner() throws Exception {
-       String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
-//        String[] testCase = new String[] { "testInnerJoinTopNCube" };
+        String[] testCase = new String[] { "testInnerJoinTopNCube" };
+       // String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
         runTestAndAssertSucceed(testCase);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 378221c..6865457 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -26,11 +26,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
@@ -81,6 +83,8 @@ public class OLAPContext {
     public OLAPContext(int seq) {
         this.id = seq;
         this.storageContext = new StorageContext();
+        this.sortMeasures = Lists.newArrayList();
+        this.sortOrders = Lists.newArrayList();
         Map<String, String> parameters = _localPrarameters.get();
         if (parameters != null) {
             String acceptPartialResult = parameters.get(PRM_ACCEPT_PARTIAL_RESULT);
@@ -111,10 +115,14 @@ public class OLAPContext {
     public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
     public TupleFilter filter;
     public List<JoinDesc> joins = new LinkedList<JoinDesc>();
+    private List<MeasureDesc> sortMeasures;
+    private List<SQLDigest.OrderEnum> sortOrders;
 
     // rewrite info
     public Map<String, RelDataType> rewriteFields = new HashMap<String, RelDataType>();
 
+    public int limit;
+
     // hive query
     public String sql = "";
 
@@ -126,7 +134,7 @@ public class OLAPContext {
 
     public SQLDigest getSQLDigest() {
         if (sqlDigest == null)
-            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations);
+            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
         return sqlDigest;
     }
 
@@ -144,4 +152,12 @@ public class OLAPContext {
         }
         this.returnTupleInfo = info;
     }
+
+    public void addSort(MeasureDesc measure, SQLDigest.OrderEnum order) {
+        if (measure != null) {
+            sortMeasures.add(measure);
+            sortOrders.add(order);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
index 60acd40..74d5de0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
@@ -78,6 +78,7 @@ public class OLAPLimitRel extends SingleRel implements OLAPRel {
         Number limitValue = (Number) (((RexLiteral) localFetch).getValue());
         int limit = limitValue.intValue();
         this.context.storageContext.setLimit(limit);
+        this.context.limit = limit;
         if(localOffset != null) {
             Number offsetValue = (Number) (((RexLiteral) localOffset).getValue());
             int offset = offsetValue.intValue();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
index fa5dc1d..b023dfd 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.storage.StorageContext;
 
 import com.google.common.base.Preconditions;
@@ -82,12 +83,12 @@ public class OLAPSortRel extends Sort implements OLAPRel {
 
         for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
             int index = fieldCollation.getFieldIndex();
-            StorageContext.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
+            SQLDigest.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
             OLAPRel olapChild = (OLAPRel) this.getInput();
             TblColRef orderCol = olapChild.getColumnRowType().getAllColumns().get(index);
             MeasureDesc measure = findMeasure(orderCol);
             if (measure != null) {
-                this.context.storageContext.addSort(measure, order);
+                this.context.addSort(measure, order);
             }
             this.context.storageContext.markSort();
         }
@@ -96,11 +97,11 @@ public class OLAPSortRel extends Sort implements OLAPRel {
         this.columnRowType = buildColumnRowType();
     }
 
-    private StorageContext.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
+    private SQLDigest.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
         if (direction == RelFieldCollation.Direction.DESCENDING) {
-            return StorageContext.OrderEnum.DESCENDING;
+            return SQLDigest.OrderEnum.DESCENDING;
         } else {
-            return StorageContext.OrderEnum.ASCENDING;
+            return SQLDigest.OrderEnum.ASCENDING;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 72d7c4a..4821ce9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query44.sql";
+        String queryFileName = "src/test/resources/query/sql/query82.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
new file mode 100644
index 0000000..78e30c5
--- /dev/null
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT 
+ seller_id 
+ ,sum(test_kylin_fact.price) as GMV 
+ FROM test_kylin_fact inner join edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ where test_kylin_fact.cal_dt < DATE '2013-02-01' 
+ group by 
+ test_kylin_fact.seller_id order by gmv desc limit 100

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..6b62753
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT 
+ test_kylin_fact.cal_dt, seller_id 
+ ,sum(test_kylin_fact.price) as GMV 
+ FROM test_kylin_fact 
+left JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ group by 
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by gmv desc limit 100

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
new file mode 100644
index 0000000..a8b1d02
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.apache.kylin.storage.translate.HBaseKeyRange;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
+
+    private Iterator<Tuple> innerResultIterator;
+    
+    public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+                                    Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
+                                    List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
+        super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
+    }
+    
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+
+
+        if (innerResultIterator == null) {
+            if (resultIterator == null) {
+                if (rangeIterator.hasNext() == false)
+                    return false;
+
+                resultIterator = doScan(rangeIterator.next());
+            }
+
+            if (resultIterator.hasNext() == false) {
+                closeScanner();
+                resultIterator = null;
+                innerResultIterator = null;
+                return hasNext();
+            }
+
+            Result result = resultIterator.next();
+            scanCount++;
+            if (++scanCountDelta >= 1000)
+                flushScanCountDelta();
+            innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
+        }
+
+        if (innerResultIterator.hasNext()) {
+            next = innerResultIterator.next();
+            return true;
+        } else {
+            innerResultIterator = null;
+            return hasNext();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 0110fbe..9b2cf66 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -64,25 +64,26 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     public static final int SCAN_CACHE = 1024;
 
-    private final CubeSegment cubeSeg;
+    protected final CubeSegment cubeSeg;
     private final TupleFilter filter;
     private final Collection<TblColRef> groupBy;
-    private final Collection<RowValueDecoder> rowValueDecoders;
+    protected final List<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
     private final HTableInterface table;
 
-    private final CubeTupleConverter tupleConverter;
-    private final Iterator<HBaseKeyRange> rangeIterator;
-    private final Tuple oneTuple; // avoid new instance
+    protected CubeTupleConverter tupleConverter;
+    protected final Iterator<HBaseKeyRange> rangeIterator;
+    protected final Tuple oneTuple; // avoid new instance
 
     private Scan scan;
     private ResultScanner scanner;
-    private Iterator<Result> resultIterator;
-    private int scanCount;
-    private int scanCountDelta;
-    private Tuple next;
-
+    protected Iterator<Result> resultIterator;
+    protected int scanCount;
+    protected int scanCountDelta;
+    protected Tuple next;
+    protected final Cuboid cuboid;
+    
     public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
             List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
@@ -93,12 +94,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.context = context;
         this.tableName = cubeSeg.getStorageLocationIdentifier();
 
-        Cuboid cuboid = keyRanges.get(0).getCuboid();
+        cuboid = keyRanges.get(0).getCuboid();
         for (HBaseKeyRange range : keyRanges) {
             assert cuboid.equals(range.getCuboid());
         }
 
-        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
+        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
         this.oneTuple = new Tuple(returnTupleInfo);
         this.rangeIterator = keyRanges.iterator();
 
@@ -108,9 +109,10 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }
     }
-
+    
     @Override
     public boolean hasNext() {
+
         if (next != null)
             return true;
 
@@ -136,6 +138,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return true;
     }
 
+    
     @Override
     public Tuple next() {
         if (next == null) {
@@ -153,7 +156,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         throw new UnsupportedOperationException();
     }
 
-    private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
+    protected final Iterator<Result> doScan(HBaseKeyRange keyRange) {
         Iterator<Result> iter = null;
         try {
             scan = buildScan(keyRange);
@@ -247,7 +250,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return result;
     }
 
-    private void closeScanner() {
+    protected void closeScanner() {
         flushScanCountDelta();
 
         if (logger.isDebugEnabled() && scan != null) {
@@ -286,7 +289,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         closeTable();
     }
 
-    private void flushScanCountDelta() {
+    protected void flushScanCountDelta() {
         context.increaseTotalScanCount(scanCountDelta);
         scanCountDelta = 0;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 836f142..238fbed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,20 +18,10 @@
 
 package org.apache.kylin.storage.hbase.cube.v1;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
@@ -68,10 +58,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
 
 //v1
 @SuppressWarnings("unused")
@@ -85,17 +72,32 @@ public class CubeStorageQuery implements ICachableStorageQuery {
     private final CubeInstance cubeInstance;
     private final CubeDesc cubeDesc;
     private final String uuid;
+    private Collection<TblColRef> topNColumns;
 
     public CubeStorageQuery(CubeInstance cube) {
         this.cubeInstance = cube;
         this.cubeDesc = cube.getDescriptor();
         this.uuid = cube.getUuid();
+        this.topNColumns = Lists.newArrayList();
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN()) {
+                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+                topNColumns.add(colRefs.get(colRefs.size() - 1));
+            }
+        }
     }
-
+    
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
 
+        // check whether this is a TopN query;
+        checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNCol = extractTopNCol(groups);
+        if (topNCol != null)
+            groups.remove(topNCol);
+
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
@@ -148,7 +150,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         setLimit(filter, context);
 
         HConnection conn = HBaseConnection.get(context.getConnUrl());
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
     }
 
     @Override
@@ -179,6 +181,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             if (sqlDigest.metricColumns.contains(column)) {
                 continue;
             }
+
+            // skip topN display col
+            if (topNColumns.contains(column)) {
+                continue;
+            }
+
             dimensions.add(column);
         }
     }
@@ -700,4 +708,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
     }
 
+    private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNDisplayCol = extractTopNCol(groups);
+        boolean hasTopN = topNDisplayCol != null;
+
+        if (hasTopN == false)
+            return;
+
+        if (sqlDigest.aggregations.size() != 1) {
+            throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+        }
+
+        FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+        if (functionDesc.isSum() == false) {
+            throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+        }
+
+        FunctionDesc rewriteFunction = null;
+        // replace the SUM to the TopN function
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+                rewriteFunction = measureDesc.getFunction();
+                break;
+            }
+        }
+
+        if (rewriteFunction == null) {
+            throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+        }
+
+        sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+        logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+    }
+
+    private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+        for (TblColRef colRef : colRefs) {
+            if (topNColumns.contains(colRef)) {
+                return colRef;
+            }
+        }
+
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index e569cbd..8813901 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,21 +2,31 @@ package org.apache.kylin.storage.hbase.cube.v1;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,20 +40,24 @@ public class CubeTupleConverter {
     final TupleInfo tupleInfo;
     final RowKeyDecoder rowKeyDecoder;
     final List<RowValueDecoder> rowValueDecoders;
-    final List<IDerivedColumnFiller> derivedColFillers;
-
+    final List<IDerivedColumnFiller> derivedColFillers; 
     final int[] dimensionTupleIdx;
     final int[][] metricsMeasureIdx;
     final int[][] metricsTupleIdx;
+    final TblColRef topNCol;
+    int topNColTupleIdx;
+    int topNMeasureTupleIdx;
+    Dictionary<String> topNColDict;
 
-    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
+    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = tupleInfo;
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
         this.rowValueDecoders = rowValueDecoders;
         this.derivedColFillers = Lists.newArrayList();
-
+        this.topNCol = topNCol;
+        
         List<TblColRef> dimCols = cuboid.getColumns();
 
         // pre-calculate dimension index mapping to tuple
@@ -52,6 +66,7 @@ public class CubeTupleConverter {
             TblColRef col = dimCols.get(i);
             dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
         }
+        
 
         // pre-calculate metrics index mapping to tuple
         metricsMeasureIdx = new int[rowValueDecoders.size()][];
@@ -64,6 +79,7 @@ public class CubeTupleConverter {
             metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
             for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
                 FunctionDesc aggrFunc = measures[mi].getFunction();
+                
                 int tupleIdx;
                 // a rewrite metrics is identified by its rewrite field name
                 if (aggrFunc.needRewrite()) {
@@ -80,6 +96,13 @@ public class CubeTupleConverter {
             }
         }
 
+        if (this.topNCol != null) {
+            this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+            this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
+            
+            this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+        }
+        
         // prepare derived columns and filler
         Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -93,6 +116,46 @@ public class CubeTupleConverter {
         }
     }
 
+    public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
+        translateResult(hbaseRow, tuple);
+        Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+        assert (topNCounterObj instanceof TopNCounter);
+        return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+    }
+
+    private class TopNCounterTupleIterator implements Iterator {
+
+        private Tuple tuple;
+        private Iterator<Counter> topNCounterIterator;
+        private Counter<ByteArray> counter;
+        
+        private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+            this.tuple = tuple;
+            this.topNCounterIterator = topNCounter.iterator();
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return topNCounterIterator.hasNext();
+        }
+
+        @Override
+        public Tuple next() {
+            counter = topNCounterIterator.next();
+            int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+            String colValue = topNColDict.getValueFromId(key);
+            tuple.setDimensionValue(topNColTupleIdx, colValue);
+            tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+            
+            return tuple;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+    
     public void translateResult(Result hbaseRow, Tuple tuple) {
         try {
             byte[] rowkey = hbaseRow.getRow();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e433b78..831cadb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -58,7 +58,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITuple next;
 
     public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
-            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
+            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
             StorageContext context, TupleInfo returnTupleInfo) {
 
         this.context = context;
@@ -67,8 +67,13 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
 
         this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
         Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
+        boolean useTopN = topNCol != null;
         for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
-            CubeSegmentTupleIterator segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+            CubeSegmentTupleIterator segIter;
+            if (useTopN)
+                segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+            else
+                segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
             this.segmentIteratorList.add(segIter);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
index 0b4fd07..df52664 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.common;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -28,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -142,7 +144,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         int count = 0;
         ITupleIterator iterator = null;
         try {
-            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
             iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
             while (iterator.hasNext()) {
                 ITuple tuple = iterator.next();


[7/7] incubator-kylin git commit: KYLIN-943 TopN in cube builder, on the way

Posted by sh...@apache.org.
KYLIN-943 TopN in cube builder, on the way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6208520b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6208520b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6208520b

Branch: refs/heads/KYLIN-943
Commit: 6208520b412dd6a99440c496ab556dfa3aaa3e5a
Parents: 196a537
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 16:37:46 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   |  52 +++++++
 .../java/org/apache/kylin/cube/CubeManager.java |   4 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |  18 +--
 .../org/apache/kylin/cube/model/CubeDesc.java   |  35 ++++-
 .../cube/model/CubeJoinedFlatTableDesc.java     |  23 ++-
 .../kylin/metadata/measure/TopNAggregator.java  |   1 -
 .../measure/serializer/DataTypeSerializer.java  |   4 +
 .../serializer/TopNCounterSerializer.java       |  64 +++++++-
 .../kylin/metadata/model/ParameterDesc.java     |  18 +--
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  79 +++++++---
 .../mr/steps/FactDistinctColumnsMapperBase.java |  38 +++--
 .../mr/steps/FactDistinctColumnsReducer.java    |   4 +-
 .../mr/steps/FactDistinctHiveColumnsMapper.java |  14 +-
 .../localmeta/cube/test_kylin_cube_topn.json    |  10 ++
 .../cube_desc/test_kylin_cube_topn_desc.json    | 146 +++++++++++++++++++
 15 files changed, 429 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 2f337c2..51fe1b2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -18,9 +18,11 @@
 
 package org.apache.kylin.common.topn;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.Pair;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,6 +39,8 @@ import java.util.Map;
  * @param <T> type of data in the stream to be summarized
  */
 public class TopNCounter<T> implements ITopK<T>, Externalizable {
+    
+    public static final int EXTRA_SPACE_RATE = 50;
 
     protected class Bucket {
 
@@ -259,6 +263,26 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
     }
 
+    public void fromExternal(int size, double[] counters, List<T> items) {
+        this.bucketList = new DoublyLinkedList<Bucket>();
+
+        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+        Bucket currentBucket = null;
+        ListNode2<Bucket> currentBucketNode = null;
+        for (int i = 0; i < size; i++) {
+            Counter<T> c = new Counter<T>();
+            c.count = counters[i];
+            c.item = items.get(i);
+            if (currentBucket == null || c.count != currentBucket.count) {
+                currentBucket = new Bucket(c.count);
+                currentBucketNode = bucketList.add(currentBucket);
+            }
+            c.bucketNode = currentBucketNode;
+            counterMap.put(c.item, currentBucket.counterList.add(c));
+        }
+    }
+
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(this.capacity);
@@ -363,4 +387,32 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
 
     }
+    
+    public double[] getCounters() {
+        double[] counters = new double[size()];
+        int index = 0;
+
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                counters[index] = c.count;
+                index ++;
+            }
+        }
+
+        return counters;
+    }
+    
+    public List<T> getItems() {
+        List<T> items = Lists.newArrayList();
+        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Bucket b = bNode.getValue();
+            for (Counter<T> c : b.counterList) {
+                items.add(c.item);
+            }
+        }
+
+        return items;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index c6e6b88..2232f01 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -163,11 +163,11 @@ public class CubeManager implements IRealizationProvider {
 
     public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-        if (!cubeDesc.getRowkey().isUseDictionary(col))
+        if (!cubeDesc.getAllColumnsNeedDictionary().contains(col))
             return null;
 
         DictionaryManager dictMgr = getDictionaryManager();
-        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factTableValueProvider);
+        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), "true", col, factTableValueProvider);
 
         if (dictInfo != null) {
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 714b309..0e7a7ac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.cube.cli;
 
-import java.io.IOException;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -31,6 +29,8 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class DictionaryGeneratorCLI {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
@@ -45,15 +45,13 @@ public class DictionaryGeneratorCLI {
     private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(config);
 
-        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    logger.info("Building dictionary for " + col);
-                    cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
-                }
-            }
+        // dictionary
+        for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionary()) {
+            logger.info("Building dictionary for " + col);
+            cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
+        }
 
+        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
             // build snapshot
             if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
                 // CubeSegment seg = cube.getTheOnlySegment();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 0060e09..90b5474 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
     private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+    private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
     private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
     private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
 
@@ -628,18 +629,28 @@ public class CubeDesc extends RootPersistentEntity {
             ParameterDesc p = f.getParameter();
             p.normalizeColumnValue();
 
+            ArrayList<TblColRef> colRefs = Lists.newArrayList();
             if (p.isColumnType()) {
-                ArrayList<TblColRef> colRefs = Lists.newArrayList();
                 for (String cName : p.getValue().split("\\s*,\\s*")) {
                     ColumnDesc sourceColumn = factTable.findColumnByName(cName);
                     TblColRef colRef = new TblColRef(sourceColumn);
                     colRefs.add(colRef);
                     allColumns.add(colRef);
                 }
-                if (colRefs.isEmpty() == false)
-                    p.setColRefs(colRefs);
             }
 
+            // for topN
+            if (StringUtils.isNotEmpty(p.getDisplayColumn())) {
+                ColumnDesc sourceColumn = factTable.findColumnByName(p.getDisplayColumn());
+                TblColRef colRef = new TblColRef(sourceColumn);
+                colRefs.add(colRef);
+                measureDisplayColumns.add(colRef);
+                allColumns.add(colRef);
+            }
+
+            if (colRefs.isEmpty() == false)
+                p.setColRefs(colRefs);
+            
             // verify holistic count distinct as a dependent measure
             if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
                 throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
@@ -799,4 +810,22 @@ public class CubeDesc extends RootPersistentEntity {
         this.engineType = engineType;
     }
 
+    
+    public List<TblColRef> getAllColumnsNeedDictionary() {
+        List<TblColRef> result = Lists.newArrayList();
+        
+        for (RowKeyColDesc rowKeyColDesc : rowkey.getRowKeyColumns()) {
+            TblColRef colRef = rowKeyColDesc.getColRef();
+            if (rowkey.isUseDictionary(colRef)) {
+                result.add(colRef);
+            }
+        }
+        
+        for (TblColRef colRef : measureDisplayColumns) {
+            if (!result.contains(colRef))
+                result.add(colRef);
+        }
+        return result;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 4ac8848..adaf542 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -30,9 +30,12 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
 
     private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
 
+    private Map<String, Integer> columnIndexMap;
+
     public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
+        this.columnIndexMap = Maps.newHashMap();
         parseCubeDesc();
     }
 
@@ -55,26 +58,25 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
             this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
         }
 
-        Map<String, Integer> dimensionIndexMap = Maps.newHashMap();
         int columnIndex = 0;
         for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
-            dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex);
+            columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
             columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
             columnIndex++;
         }
 
-        // build index
+        // build index for rowkey columns
         List<TblColRef> cuboidColumns = baseCuboid.getColumns();
         rowKeyColumnIndexes = new int[rowkeyColCount];
         for (int i = 0; i < rowkeyColCount; i++) {
             String colName = colName(cuboidColumns.get(i).getCanonicalName());
-            Integer dimIdx = dimensionIndexMap.get(colName);
+            Integer dimIdx = columnIndexMap.get(colName);
             if (dimIdx == null) {
                 throw new RuntimeException("Can't find column " + colName);
             }
             rowKeyColumnIndexes[i] = dimIdx;
         }
-
+        
         List<MeasureDesc> measures = cubeDesc.getMeasures();
         int measureSize = measures.size();
         measureColumnIndexes = new int[measureSize][];
@@ -90,6 +92,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
                     measureColumnIndexes[i][j] = contains(columnList, c);
                     if (measureColumnIndexes[i][j] < 0) {
                         measureColumnIndexes[i][j] = columnIndex;
+                        columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
                         columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
                         columnIndex++;
                     }
@@ -154,4 +157,14 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     private static String colName(String canonicalColName) {
         return canonicalColName.replace(".", "_");
     }
+
+    public int getColumnIndex(TblColRef colRef) {
+        String key = colName(colRef.getCanonicalName());
+        Integer index = columnIndexMap.get(key);
+        if (index == null)
+            throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+        
+        return index.intValue();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index 9cea4cc..0de5fe8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -53,7 +53,6 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
 
     @Override
     public int getMemBytesEstimate() {
-        // 1024 + 60 returned by AggregationCacheMemSizeTest
         return 8 * capacity;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
index e68beda..4fadbb0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
@@ -63,6 +63,10 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
             return new HLLCSerializer(type);
         }
 
+        if (type.isTopN()) {
+            return new TopNCounterSerializer(type);
+        }
+
         Class<?> clz = implementations.get(type.getName());
         if (clz == null)
             throw new RuntimeException("No MeasureSerializer for type " + type);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index 1c6c442..56ed85d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -18,44 +18,92 @@
 
 package org.apache.kylin.metadata.measure.serializer;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.DataType;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 /**
  * 
  */
 public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer();
+
+    private int precision;
+    
+    public TopNCounterSerializer(DataType dataType) {
+        this.precision = dataType.getPrecision();
+    }
 
     @Override
     public int peekLength(ByteBuffer in) {
-        return 0;
+        int mark = in.position();
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        dds.deserialize(in);
+        int len = in.position() - mark + keyLength * size;
+        in.position(mark);
+        return len;
     }
 
     @Override
     public int maxLength() {
-        return 0;
+        return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
     }
 
     @Override
     public TopNCounter<ByteArray> valueOf(byte[] value) {
-        return null;
+        ByteBuffer buffer = ByteBuffer.wrap(value);
+        int sizeOfId = buffer.getInt();
+        int keyEncodedValue = buffer.getInt();
+        double counter = buffer.getDouble();
+
+        ByteArray key = new ByteArray(sizeOfId);
+        BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
+        
+        TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
+        topNCounter.offer(key, counter);
+        return topNCounter;
     }
 
     @Override
     public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+        double[] counters = value.getCounters();
+        List<ByteArray> items = value.getItems();
+        int keyLength = items.size() > 0 ? items.get(0).length() : 0;
+        out.putInt(value.getCapacity());
+        out.putInt(value.size());
+        out.putInt(keyLength);
+        dds.serialize(counters, out);
 
+        for (ByteArray item : items) {
+            out.put(item.array());
+        }
     }
 
     @Override
     public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
-        return null;
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        double[] counters = dds.deserialize(in);
+        List<ByteArray> items = Lists.newArrayList();
+        
+        for(int i=0; i<size; i++) {
+            ByteArray byteArray = new ByteArray(keyLength);
+            in.get(byteArray.array());
+            items.add(byteArray);
+        }
+        
+        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+        counter.fromExternal(size, counters, items);
+        return counter;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index bdb4b91..9773b84 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -40,8 +40,8 @@ public class ParameterDesc {
     @JsonProperty("value")
     private String value;
     
-    @JsonProperty("counter")
-    private String counter;
+    @JsonProperty("displaycolumn")
+    private String displayColumn;
 
     private List<TblColRef> colRefs;
 
@@ -65,12 +65,12 @@ public class ParameterDesc {
         this.value = value;
     }
 
-    public String getCounter() {
-        return counter;
+    public String getDisplayColumn() {
+        return displayColumn;
     }
 
-    public void setCounter(String counter) {
-        this.counter = counter;
+    public void setDisplayColumn(String displayColumn) {
+        this.displayColumn = displayColumn;
     }
 
     public List<TblColRef> getColRefs() {
@@ -102,7 +102,7 @@ public class ParameterDesc {
 
         ParameterDesc that = (ParameterDesc) o;
 
-        if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+        if (displayColumn != null ? !displayColumn.equals(that.displayColumn) : that.displayColumn != null) return false;
         if (type != null ? !type.equals(that.type) : that.type != null) return false;
         if (value != null ? !value.equals(that.value) : that.value != null) return false;
 
@@ -113,13 +113,13 @@ public class ParameterDesc {
     public int hashCode() {
         int result = type != null ? type.hashCode() : 0;
         result = 31 * result + (value != null ? value.hashCode() : 0);
-        result = 31 * result + (counter != null ? counter.hashCode() : 0);
+        result = 31 * result + (displayColumn != null ? displayColumn.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
-        return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
+        return "ParameterDesc [type=" + type + ", value=" + value + ", displayColumn=" + displayColumn + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index c2d0f92..ed1fd4a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -4,7 +4,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -18,14 +20,12 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +57,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     private Text outputKey = new Text();
     private Text outputValue = new Text();
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -91,8 +92,24 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         keyBytesBuf = new byte[colCount][];
 
+        initTopNDisplayColDictionaryMap();
         initNullBytes();
     }
+    
+    private void initTopNDisplayColDictionaryMap() {
+        topNDisplayColDictMap = Maps.newHashMap();
+        for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
+            FunctionDesc func = measureDesc.getFunction();
+            if (func.isTopN()) {
+                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+                Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(displayCol);
+                topNDisplayColDictMap.put(displayColIdx, dictionary);
+            }
+        }
+    }
 
     private void initNullBytes() {
         nullBytes = Lists.newArrayList();
@@ -147,27 +164,49 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         // constant
         if (flatTableIdx == null) {
             result = Bytes.toBytes(paramDesc.getValue());
+        } 
+        // count and count distinct
+        else if (func.isCount() || func.isHolisticCountDistinct()) {
+            // note for holistic count distinct, this value will be ignored
+            result = ONE;
         }
-        // column values
+        // topN, need encode the key column
+        else if(func.isTopN()) {
+            // encode the key column with dict, and get the counter column;
+            int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
+            Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
+            int keyColEncoded = displayColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
+            valueBuf.clear();
+            valueBuf.putInt(displayColDict.getSizeOfId());
+            valueBuf.putInt(keyColEncoded);
+            if (flatTableIdx.length == 1) {
+                // only displayCol, use 1.0 as counter
+                valueBuf.putDouble(1.0);
+            } else {
+                // get the counter column value
+                valueBuf.putDouble(Double.valueOf(Bytes.toString(splitBuffers[flatTableIdx[0]].value)));
+            }
+            
+            result = valueBuf.array();
+            
+        } 
+        // normal case, concat column values
         else {
-            // for multiple columns, their values are joined
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                if (result == null) {
-                    result = Arrays.copyOf(split.value, split.length);
-                } else {
-                    byte[] newResult = new byte[result.length + split.length];
-                    System.arraycopy(result, 0, newResult, 0, result.length);
-                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
-                    result = newResult;
+                // for multiple columns, their values are joined
+                for (int i = 0; i < flatTableIdx.length; i++) {
+                    SplittedBytes split = splitBuffers[flatTableIdx[i]];
+                    if (result == null) {
+                        result = Arrays.copyOf(split.value, split.length);
+                    } else {
+                        byte[] newResult = new byte[result.length + split.length];
+                        System.arraycopy(result, 0, newResult, 0, result.length);
+                        System.arraycopy(split.value, 0, newResult, result.length, split.length);
+                        result = newResult;
+                    }
                 }
             }
-        }
 
-        if (func.isCount() || func.isHolisticCountDistinct()) {
-            // note for holistic count distinct, this value will be ignored
-            result = ONE;
-        }
+        
 
         if (isNull(result)) {
             result = null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 098373a..b97c88a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -1,10 +1,5 @@
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -14,7 +9,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.KylinMapper;
@@ -24,6 +19,11 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  */
 public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
@@ -33,7 +33,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
-    protected List<TblColRef> columns;
+    protected List<TblColRef> dictionaryColumns;
     protected ArrayList<Integer> factDictCols;
     protected IMRTableInputFormat flatTableInputFormat;
 
@@ -41,6 +41,9 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected Text outputValue = new Text();
     protected int errorRecordCounter = 0;
 
+    protected CubeJoinedFlatTableDesc intermediateTableDesc;
+    protected int[] dictionaryColumnIndex;
+
     @Override
     protected void setup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
@@ -52,23 +55,30 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
         cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
+        dictionaryColumns = cubeDesc.getAllColumnsNeedDictionary();
 
         factDictCols = new ArrayList<Integer>();
-        RowKeyDesc rowKey = cubeDesc.getRowkey();
         DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        for (int i = 0; i < columns.size(); i++) {
-            TblColRef col = columns.get(i);
-            if (!rowKey.isUseDictionary(col))
-                continue;
+        for (int i = 0; i < dictionaryColumns.size(); i++) {
+            TblColRef col = dictionaryColumns.get(i);
 
-            String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+            String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), "true", col).getTable();
             if (cubeDesc.getModel().isFactTable(scanTable)) {
                 factDictCols.add(i);
             }
         }
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        dictionaryColumnIndex = new int[factDictCols.size()];
+        for (int i = 0; i < factDictCols.size(); i++) {
+            Integer column = factDictCols.get(i);
+            TblColRef colRef = dictionaryColumns.get(column);
+            int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+        }
+
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index fcc12e4..85312ff 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -77,9 +77,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeDesc = cube.getDescriptor();
 
-        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        columnList = baseCuboid.getColumns();
+        columnList = cubeDesc.getAllColumnsNeedDictionary();
         collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 86b3605..e43d5d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -23,7 +23,9 @@ import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -36,14 +38,13 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  * @author yangli9
  */
 public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
 
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-
     protected boolean collectStatistics = false;
     protected CuboidScheduler cuboidScheduler = null;
     protected int nRowKey;
@@ -58,7 +59,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     protected void setup(Context context) throws IOException {
         super.setup(context);
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+
+        
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
@@ -111,9 +113,9 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         String[] row = flatTableInputFormat.parseMapperInput(record);
         try {
-            for (int i : factDictCols) {
-                outputKey.set((long) i);
-                String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            for (int i = 0; i < factDictCols.size(); i++) {
+                outputKey.set((long) factDictCols.get(i));
+                String fieldValue = row[dictionaryColumnIndex[i]];
                 if (fieldValue == null)
                     continue;
                 byte[] bytes = Bytes.toBytes(fieldValue);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
new file mode 100644
index 0000000..903fc15
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
@@ -0,0 +1,10 @@
+{
+  "uuid" : "33354455-a33e-4b69-83dd-0bb8b1f8c53b",
+  "last_modified" : 0,
+  "name" : "test_kylin_cube_topn",
+  "owner" : null,
+  "version" : null,
+  "descriptor" : "test_kylin_cube_topn_desc",
+  "segments" : [ ],
+  "create_time" : null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
new file mode 100644
index 0000000..2e0e376
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -0,0 +1,146 @@
+{
+  "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92",
+  "name": "test_kylin_cube_topn_desc",
+  "description": null,
+  "dimensions": [
+    {
+      "id": 0,
+      "name": "CAL_DT",
+      "table": "EDW.TEST_CAL_DT",
+      "column": null,
+      "derived": [
+        "WEEK_BEG_DT"
+      ],
+      "hierarchy": false
+    }
+  ],
+  "measures": [
+    {
+      "id": 1,
+      "name": "GMV_SUM",
+      "function": {
+        "expression": "SUM",
+        "parameter": {
+          "type": "column",
+          "value": "PRICE"
+        },
+        "returntype": "decimal(19,4)"
+      },
+      "dependent_measure_ref": null
+    },
+    {
+      "id": 2,
+      "name": "GMV_MIN",
+      "function": {
+        "expression": "MIN",
+        "parameter": {
+          "type": "column",
+          "value": "PRICE"
+        },
+        "returntype": "decimal(19,4)"
+      },
+      "dependent_measure_ref": null
+    },
+    {
+      "id": 3,
+      "name": "GMV_MAX",
+      "function": {
+        "expression": "MAX",
+        "parameter": {
+          "type": "column",
+          "value": "PRICE"
+        },
+        "returntype": "decimal(19,4)"
+      },
+      "dependent_measure_ref": null
+    },
+    {
+      "id": 4,
+      "name": "TRANS_CNT",
+      "function": {
+        "expression": "COUNT",
+        "parameter": {
+          "type": "constant",
+          "value": "1"
+        },
+        "returntype": "bigint"
+      },
+      "dependent_measure_ref": null
+    },
+    {
+      "id": 5,
+      "name": "ITEM_COUNT_SUM",
+      "function": {
+        "expression": "SUM",
+        "parameter": {
+          "type": "column",
+          "value": "ITEM_COUNT"
+        },
+        "returntype": "bigint"
+      },
+      "dependent_measure_ref": null
+    },
+    {
+      "id": 6,
+      "name": "TOP_SELLER",
+      "function": {
+        "expression": "TOP_N",
+        "parameter": {
+          "type": "column",
+          "value": "PRICE",
+          "displaycolumn": "seller_id"
+        },
+        "returntype": "topn(100)"
+      },
+      "dependent_measure_ref": null
+    }
+  ],
+  "rowkey": {
+    "rowkey_columns": [
+      {
+        "column": "cal_dt",
+        "length": 0,
+        "dictionary": "true",
+        "mandatory": false
+      }
+    ],
+    "aggregation_groups": [
+      [
+        "cal_dt"
+      ]
+    ]
+  },
+  "last_modified": 1422435345330,
+  "model_name": "test_kylin_inner_join_model_desc",
+  "null_string": null,
+  "hbase_mapping": {
+    "column_family": [
+      {
+        "name": "f1",
+        "columns": [
+          {
+            "qualifier": "m",
+            "measure_refs": [
+              "gmv_sum",
+              "gmv_min",
+              "gmv_max",
+              "trans_cnt",
+              "item_count_sum"
+            ]
+          }
+        ]
+      },  {
+        "name": "f2",
+        "columns": [
+          {
+            "qualifier": "m",
+            "measure_refs": [
+              "top_seller"
+            ]
+          }
+        ]
+      }
+    ]
+  },
+  "notify_list": null
+}
\ No newline at end of file