You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/10/26 06:41:09 UTC

[1/2] incubator-kylin git commit: # This is a combination of 10 commits. # The first commit's message is: KYLIN-1068 Optimize the memory footprint for TopN counter

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 06aeb994e -> d0e0d9fc9


# This is a combination of 10 commits.
# The first commit's message is:
KYLIN-1068 Optimize the memory footprint for TopN counter

# This is the 2nd commit message:

KYLIN-1068 code small refactor

# This is the 3rd commit message:

KYLIN-1068 remove “error” from Counter as it is invisible

# This is the 4th commit message:

KYLIN-1066 for sandbox test, limit max reducer to 5

# This is the 5th commit message:

KYLIN-1068 give bigger random range for PRICE column on test_kylin_fact table

# This is the 6th commit message:

KYLIN-1068 small changes

# This is the 7th commit message:

KYLIN-1068 add log in htable cleanup

# This is the 8th commit message:

KYLIN-1068 cleanup test case

# This is the 9th commit message:

KYLIN-1068 code refine

# This is the 10th commit message:

KYLIN-1068 only build 1 segment for test cubes to accelerate CI


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1b2141d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1b2141d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1b2141d

Branch: refs/heads/2.x-staging
Commit: d1b2141dfad5309897f6efc64c5f743f3c10c0b3
Parents: 06aeb99
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 26 13:30:16 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      |  11 +-
 .../kylin/job/BuildCubeWithStreamTest.java      |   5 +-
 .../java/org/apache/kylin/job/DataGenTest.java  |   4 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |   1 -
 .../org/apache/kylin/common/topn/Counter.java   |  18 +-
 .../kylin/common/topn/DoublyLinkedList.java     |  84 ++-----
 .../apache/kylin/common/topn/TopNCounter.java   | 234 ++++++-------------
 .../kylin/common/topn/TopNCounterBasicTest.java |   4 +-
 .../common/topn/TopNCounterCombinationTest.java |   1 -
 .../kylin/common/topn/TopNCounterTest.java      |   2 +-
 .../kylin/metadata/measure/TopNAggregator.java  |   6 +-
 .../serializer/TopNCounterSerializer.java       |  23 +-
 .../localmeta/data/data_gen_config.json         |   8 +
 .../test_case_data/sandbox/kylin.properties     |   8 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |   1 +
 .../storage/hbase/util/StorageCleanupJob.java   |   4 +-
 16 files changed, 153 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 5be11f1..26ad960 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -196,8 +196,8 @@ public class BuildCubeWithEngineTest {
         long date2 = f.parse("2013-01-01").getTime();
         long date3 = f.parse("2022-01-01").getTime();
         List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3));
+//        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
         return result;
     }
 
@@ -229,6 +229,8 @@ public class BuildCubeWithEngineTest {
         final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
         // this cube's start date is 0, end date is 20120601000000
         long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        
+        /*
         long dateEnd = f.parse("2012-06-01").getTime();
 
         clearSegment(cubeName);
@@ -245,6 +247,11 @@ public class BuildCubeWithEngineTest {
         dateEnd = f.parse("2023-01-01").getTime();
         result.add(buildSegment(cubeName, dateStart, dateEnd));
 
+*/
+        long dateEnd = f.parse("2023-01-01").getTime();
+
+        clearSegment(cubeName);
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
         return result;
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 07ab947..63bba70 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -113,7 +113,10 @@ public class BuildCubeWithStreamTest {
     private static void backup() throws Exception {
         int exitCode = cleanupOldStorage();
         if (exitCode == 0) {
-            exportHBaseData();
+            String exportHTables = System.getProperty("kylinExportHTables");
+            if (Boolean.parseBoolean(exportHTables) == true) {
+                exportHBaseData();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
index 5c01305..af4f9fb 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -46,11 +46,11 @@ public class DataGenTest extends LocalFileMetadataTestCase {
     @Test
     public void testBasics() throws Exception {
         String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null);// default  settings
-        System.out.println(content);
+        //System.out.println(content);
         assertTrue(content.contains("FP-non GTC"));
         assertTrue(content.contains("ABIN"));
 
-        DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
+        //DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 2a643c8..73f4098 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -69,7 +69,6 @@ public class IITest extends LocalFileMetadataTestCase {
     String iiName = "test_kylin_ii_inner_join";
     IIInstance ii;
     IIDesc iiDesc;
-    String cubeName = "test_kylin_cube_with_slr_empty";
 
     List<IIRow> iiRows;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 866d3d8..2bca4df 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -31,11 +31,9 @@ import java.io.ObjectOutput;
  */
 public class Counter<T> implements Externalizable {
 
-    protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
-
     protected T item;
     protected double count;
-    protected double error;
+//    protected double error;
 
     /**
      * For de-serialization
@@ -43,10 +41,9 @@ public class Counter<T> implements Externalizable {
     public Counter() {
     }
 
-    public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
-        this.bucketNode = bucket;
+    public Counter(T item) {
         this.count = 0;
-        this.error = 0;
+//        this.error = 0;
         this.item = item;
     }
 
@@ -58,13 +55,14 @@ public class Counter<T> implements Externalizable {
         return count;
     }
 
-    public double getError() {
-        return error;
-    }
+//    public double getError() {
+//        return error;
+//    }
 
     @Override
     public String toString() {
-        return item + ":" + count + ':' + error;
+//        return item + ":" + count + ':' + error;
+        return item + ":" + count;
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 0942b84..d268a30 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -26,11 +26,11 @@ import java.util.Iterator;
  * 
  * @param <T>
  */
-public class DoublyLinkedList<T> implements Iterable<T> {
+public class DoublyLinkedList<T> {
 
-    protected int size;
-    protected ListNode2<T> tail;
-    protected ListNode2<T> head;
+    private int size;
+    private ListNode2<T> tail;
+    private ListNode2<T> head;
 
     /**
      * Append to head of list
@@ -54,6 +54,11 @@ public class DoublyLinkedList<T> implements Iterable<T> {
      */
     public ListNode2<T> enqueue(T value) {
         ListNode2<T> node = new ListNode2<T>(value);
+       
+        return enqueue(node);
+    }
+
+    public ListNode2<T> enqueue(ListNode2<T> node) {
         if (size++ == 0) {
             head = node;
         } else {
@@ -97,6 +102,19 @@ public class DoublyLinkedList<T> implements Iterable<T> {
         size++;
     }
 
+
+    public void addBefore(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.prev = node.prev;
+        newNode.next = node;
+        node.prev = newNode;
+        if (newNode.prev == null) {
+            tail = newNode;
+        } else {
+            newNode.prev.next = newNode;
+        }
+        size++;
+    }
+
     public void remove(ListNode2<T> node) {
         if (node == tail) {
             tail = node.next;
@@ -115,54 +133,7 @@ public class DoublyLinkedList<T> implements Iterable<T> {
     public int size() {
         return size;
     }
-
-
-    @Override
-    public Iterator<T> iterator() {
-        return new DoublyLinkedListIterator(this);
-    }
-
-    protected class DoublyLinkedListIterator implements Iterator<T> {
-
-        protected DoublyLinkedList<T> list;
-        protected ListNode2<T> itr;
-        protected int length;
-
-        public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
-            this.length = list.size;
-            this.list = list;
-            this.itr = list.tail;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return itr != null;
-        }
-
-        @Override
-        public T next() {
-            if (length != list.size) {
-                throw new ConcurrentModificationException();
-            }
-            T next = itr.value;
-            itr = itr.next;
-            return next;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-
-    public T first() {
-        return tail == null ? null : tail.getValue();
-    }
-
-    public T last() {
-        return head == null ? null : head.getValue();
-    }
+    
 
     public ListNode2<T> head() {
         return head;
@@ -176,13 +147,4 @@ public class DoublyLinkedList<T> implements Iterable<T> {
         return size == 0;
     }
 
-    @SuppressWarnings("unchecked")
-    public T[] toArray() {
-        T[] a = (T[]) new Object[size];
-        int i = 0;
-        for (T v : this) {
-            a[i++] = v;
-        }
-        return a;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 6814b8d..1856010 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -21,8 +21,6 @@ package org.apache.kylin.common.topn;
 import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.Pair;
 
-import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 /**
@@ -36,29 +34,12 @@ import java.util.*;
  * @param <T> type of data in the stream to be summarized
  */
 public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
-    
-    public static final int EXTRA_SPACE_RATE = 50;
-
-
-    protected class Bucket {
-
-        protected DoublyLinkedList<Counter<T>> counterList;
-
-        private double count;
 
-        public Bucket(double count) {
-            this.count = count;
-            this.counterList = new DoublyLinkedList<Counter<T>>();
-        }
-
-        public int size() {
-            return counterList.size();
-        }
-    }
+    public static final int EXTRA_SPACE_RATE = 50;
 
     protected int capacity;
     private HashMap<T, ListNode2<Counter<T>>> counterMap;
-    protected DoublyLinkedList<Bucket> bucketList;
+    protected DoublyLinkedList<Counter<T>> counterList;
 
     /**
      * @param capacity maximum size (larger capacities improve accuracy)
@@ -66,7 +47,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public TopNCounter(int capacity) {
         this.capacity = capacity;
         counterMap = new HashMap<T, ListNode2<Counter<T>>>();
-        bucketList = new DoublyLinkedList<Bucket>();
+        counterList = new DoublyLinkedList<Counter<T>>();
     }
 
     public int getCapacity() {
@@ -114,15 +95,14 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         if (isNewItem) {
 
             if (size() < capacity) {
-                counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+                counterNode = counterList.enqueue(new Counter<T>(item));
             } else {
-                Bucket min = bucketList.first();
-                counterNode = min.counterList.tail();
+                counterNode = counterList.tail();
                 Counter<T> counter = counterNode.getValue();
                 droppedItem = counter.item;
                 counterMap.remove(droppedItem);
                 counter.item = item;
-                counter.error = min.count;
+                counter.count = 0.0;
             }
             counterMap.put(item, counterNode);
         }
@@ -133,56 +113,35 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     }
 
     protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
-        Counter<T> counter = counterNode.getValue(); // count_i
-        ListNode2<Bucket> oldNode = counter.bucketNode;
-        Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
-        bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
-        counter.count = counter.count + incrementCount;
-
-        // Finding the right bucket for count_i
-        // Because we allow a single call to increment count more than once, this may not be the adjacent bucket. 
-        ListNode2<Bucket> bucketNodePrev = oldNode;
-        ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
-        while (bucketNodeNext != null) {
-            Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
-            if (counter.count == bucketNext.count) {
-                bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
-                break;
-            } else if (counter.count > bucketNext.count) {
-                bucketNodePrev = bucketNodeNext;
-                bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
-            } else {
-                // A new bucket has to be created
-                bucketNodeNext = null;
-            }
+        Counter<T> counter = counterNode.getValue();
+        counter.count += incrementCount;
+
+        ListNode2<Counter<T>> nodeNext = counterNode.getNext();
+        counterList.remove(counterNode);
+        counterNode.prev = null;
+        counterNode.next = null;
+        while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
+            nodeNext = nodeNext.getNext();
         }
 
-        if (bucketNodeNext == null) {
-            Bucket bucketNext = new Bucket(counter.count);
-            bucketNext.counterList.add(counterNode);
-            bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+        if (nodeNext != null) {
+            counterList.addBefore(nodeNext, counterNode);
+        } else {
+            counterList.add(counterNode);
         }
-        counter.bucketNode = bucketNodeNext;
 
-        //Cleaning up
-        if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
-        {
-            bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
-        }
     }
 
     @Override
     public List<T> peek(int k) {
         List<T> topK = new ArrayList<T>(k);
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                if (topK.size() == k) {
-                    return topK;
-                }
-                topK.add(c.item);
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
             }
+            topK.add(b.item);
         }
 
         return topK;
@@ -191,14 +150,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public List<Counter<T>> topK(int k) {
         List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                if (topK.size() == k) {
-                    return topK;
-                }
-                topK.add(c);
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
             }
+            topK.add(b);
         }
 
         return topK;
@@ -215,48 +172,27 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append('[');
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            sb.append('{');
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            sb.append(b.item);
+            sb.append(':');
             sb.append(b.count);
-            sb.append(":[");
-            for (Counter<T> c : b.counterList) {
-                sb.append('{');
-                sb.append(c.item);
-                sb.append(':');
-                sb.append(c.error);
-                sb.append("},");
-            }
-            if (b.counterList.size() > 0) {
-                sb.deleteCharAt(sb.length() - 1);
-            }
-            sb.append("]},");
-        }
-        if (bucketList.size() > 0) {
-            sb.deleteCharAt(sb.length() - 1);
         }
         sb.append(']');
         return sb.toString();
     }
 
-    public void fromExternal(int size, double[] counters, List<T> items) {
-        this.bucketList = new DoublyLinkedList<Bucket>();
-
-        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
-        Bucket currentBucket = null;
-        ListNode2<Bucket> currentBucketNode = null;
-        for (int i = 0; i < size; i++) {
-            Counter<T> c = new Counter<T>();
-            c.count = counters[i];
-            c.item = items.get(i);
-            if (currentBucket == null || c.count != currentBucket.count) {
-                currentBucket = new Bucket(c.count);
-                currentBucketNode = bucketList.add(currentBucket);
-            }
-            c.bucketNode = currentBucketNode;
-            counterMap.put(c.item, currentBucket.counterList.add(c));
-        }
+    /**
+     * Put element to the head position;
+     * The consumer should call this method with count in ascending way; the item will be directly put to the head of the list, without comparison for best performance;
+     * @param item
+     * @param count
+     */
+    public void offerToHead(T item, double count) {
+        Counter<T> c = new Counter<T>(item);
+        c.count = count;
+        ListNode2<Counter<T>> node = counterList.add(c);
+        counterMap.put(c.item, node);
     }
 
     /**
@@ -273,33 +209,29 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public TopNCounter<T> merge(TopNCounter<T> another) {
         double m1 = 0.0, m2 = 0.0;
         if (this.size() >= this.capacity) {
-            m1 = this.bucketList.tail().getValue().count;
+            m1 = this.counterList.tail().getValue().count;
         }
 
         if (another.size() >= another.capacity) {
-            m2 = another.bucketList.tail().getValue().count;
+            m2 = another.counterList.tail().getValue().count;
         }
-
+        
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
             ListNode2<Counter<T>> existing = another.counterMap.get(item);
             if (existing != null) {
                 this.offer(item, another.counterMap.get(item).getValue().count);
-                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
 
                 another.counterMap.remove(item);
             } else {
                 this.offer(item, m2);
-                this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
             }
         }
 
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
             double counter = entry.getValue().getValue().count;
-            double error = entry.getValue().getValue().error;
             this.offer(item, counter + m1);
-            this.counterMap.get(item).getValue().error = error + m1;
         }
 
         return this;
@@ -313,17 +245,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         assert newCapacity > 0;
         this.capacity = newCapacity;
         if (newCapacity < this.size()) {
-            ListNode2<Bucket> tail = bucketList.tail;
+            ListNode2<Counter<T>> tail = counterList.tail();
             while (tail != null && this.size() > newCapacity) {
-                Bucket bucket = tail.getValue();
-
-                for (Counter<T> counter : bucket.counterList) {
-                    this.counterMap.remove(counter.getItem());
-                }
-                tail = tail.getNext();
+                Counter<T> bucket = tail.getValue();
+                this.counterMap.remove(bucket.getItem());
+                this.counterList.remove(tail);
+                tail = this.counterList.tail();
             }
-
-            tail.next = null;
         }
 
     }
@@ -336,14 +264,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                counters[index] = c.count;
-                index ++;
-            }
+        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Counter<T> b = bNode.getValue();
+            counters[index] = b.count;
+            index++;
         }
 
+        assert index == size();
         return counters;
     }
 
@@ -353,13 +280,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
      */
     public List<T> getItems() {
         List<T> items = Lists.newArrayList();
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                items.add(c.item);
-            }
+        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Counter<T> b = bNode.getValue();
+            items.add(b.item);
         }
 
+        assert items.size() == this.size();
         return items;
 
     }
@@ -368,46 +294,34 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public Iterator<Counter<T>> iterator() {
         return new TopNCounterIterator();
     }
-    
+
+    /**
+     * Iterator from the tail (smallest) to head (biggest);
+     */
     private class TopNCounterIterator implements Iterator {
 
-        private ListNode2<Bucket> currentBNode;
-        private Iterator<Counter<T>> currentCounterIterator;
-        
+        private ListNode2<Counter<T>> currentBNode;
+
         private TopNCounterIterator() {
-            currentBNode = bucketList.head();
-            if (currentBNode != null && currentBNode.getValue() != null) {
-                currentCounterIterator = currentBNode.getValue().counterList.iterator();
-            }
+            currentBNode = counterList.tail();
         }
-        
+
         @Override
         public boolean hasNext() {
-            if (currentCounterIterator == null) {
-                return false;
-            }
-            
-            if (currentCounterIterator.hasNext()) {
-                return true;
-            }
+            return currentBNode != null;
 
-            currentBNode = currentBNode.getPrev();
-            
-            if (currentBNode == null)
-                return false;
-
-            currentCounterIterator = currentBNode.getValue().counterList.iterator();
-            return hasNext();
         }
 
         @Override
         public Counter<T> next() {
-            return currentCounterIterator.next();
+            Counter<T> counter = currentBNode.getValue();
+            currentBNode = currentBNode.getNext();
+            return counter;
         }
 
         @Override
         public void remove() {
             throw new UnsupportedOperationException();
         }
-    } 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 252e955..771df58 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -37,7 +37,7 @@ public class TopNCounterBasicTest {
         List<Counter<String>> topk = vs.topK(6);
 
         for (Counter<String> top : topk) {
-            System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+            System.out.println(top.getItem() + ":" + top.getCount());
         }
 
     }
@@ -110,7 +110,7 @@ public class TopNCounterBasicTest {
     public void testMerge() {
 
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B", "A" };
         for (String i : stream) {
             vs.offer(i);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index a866289..d2d93b8 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,7 +29,6 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
 
     @Parameterized.Parameters
     public static Collection<Integer[]> configs() {
-        //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
         return Arrays.asList(new Integer[][] {
                 // with 20X space
                 { 10, 20 }, // top 10%

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index dab45b1..a6aa610 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -165,7 +165,7 @@ public class TopNCounterTest {
         if (consumers.length == 1)
             return consumers;
 
-        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
+        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
         
         for (int i=0, n=consumers.length; i<n; i++) {
             merged.vs.merge(consumers[i].vs);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index fde179a..4b35a7a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -38,8 +38,8 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
     @Override
     public void aggregate(TopNCounter<ByteArray> value) {
         if (sum == null) {
-            sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
             capacity = value.getCapacity();
+            sum = new TopNCounter<ByteArray>(capacity);
         }
 
         sum.merge(value);
@@ -48,13 +48,13 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
     @Override
     public TopNCounter<ByteArray> getState() {
         
-        sum.retain(capacity);
+        //sum.retain(capacity);
         return sum;
     }
 
     @Override
     public int getMemBytesEstimate() {
-        return 8 * capacity;
+        return 8 * capacity / 4;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index d4472f2..e0f293b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.metadata.measure.serializer;
 
-import com.google.common.collect.Lists;
-
+import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.DoubleDeltaSerializer;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
@@ -27,6 +26,7 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.DataType;
 
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -83,15 +83,15 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
     @Override
     public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
         double[] counters = value.getCounters();
-        List<ByteArray> items = value.getItems();
-        int keyLength = items.size() > 0 ? items.get(0).length() : 0;
+        List<ByteArray> peek = value.peek(1);
+        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
         out.putInt(value.getCapacity());
         out.putInt(value.size());
         out.putInt(keyLength);
         dds.serialize(counters, out);
-
-        for (ByteArray item : items) {
-            out.put(item.array());
+        Iterator<Counter<ByteArray>> iterator = value.iterator();
+        while (iterator.hasNext()) {
+            out.put(iterator.next().getItem().array());
         }
     }
 
@@ -101,16 +101,15 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
         int size = in.getInt();
         int keyLength = in.getInt();
         double[] counters = dds.deserialize(in);
-        List<ByteArray> items = Lists.newArrayList();
 
+        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+        ByteArray byteArray;
         for (int i = 0; i < size; i++) {
-            ByteArray byteArray = new ByteArray(keyLength);
+            byteArray = new ByteArray(keyLength);
             in.get(byteArray.array());
-            items.add(byteArray);
+            counter.offerToHead(byteArray, counters[i]);
         }
 
-        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
-        counter.fromExternal(size, counters, items);
         return counter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/examples/test_case_data/localmeta/data/data_gen_config.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/data_gen_config.json b/examples/test_case_data/localmeta/data/data_gen_config.json
index 2da72b7..ff3f676 100644
--- a/examples/test_case_data/localmeta/data/data_gen_config.json
+++ b/examples/test_case_data/localmeta/data/data_gen_config.json
@@ -26,6 +26,14 @@
         "2000000"
       ],
       "asRange": true
+    },
+    {
+      "columnName": "PRICE",
+      "valueSet": [
+        "0",
+        "1000"
+      ],
+      "asRange": true
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 03df875..35e2927 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -33,11 +33,11 @@ kylin.job.remote.cli.working.dir=/tmp/kylin
 # Max count of concurrent jobs running
 kylin.job.concurrent.max.limit=10
 
-# Whether calculate cube in mem in each mapper;
-kylin.job.cubing.inMem=true
+# Max reducer number
+kylin.job.mapreduce.max.reducer.number=5
 
-#the percentage of the sampling, default 25%
-kylin.job.cubing.inMem.sampling.percent=25
+#the percentage of the sampling, default 100%
+kylin.job.cubing.inMem.sampling.percent=100
 
 # The cut size for hbase region, in GB.
 # E.g, for cube whose capacity be marked as "SMALL", split region per 10GB by default

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 10f4661..e7c4cf5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -370,6 +370,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             htable.close();
             
             int reducerNum = regions * 3;
+            reducerNum = Math.max(1, reducerNum);
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 490c580..88cb7de 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -101,7 +101,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         }
     }
 
-    private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+    private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
 
@@ -117,6 +117,8 @@ public class StorageCleanupJob extends AbstractHadoopJob {
                 //only take care htables that belongs to self, and created more than 2 days
                 if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
                     allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+                } else {
+                    logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
                 }
             }
         }



[2/2] incubator-kylin git commit: KYLIN-1068 Optimize the memory footprint for TopN counter

Posted by sh...@apache.org.
KYLIN-1068 Optimize the memory footprint for TopN counter

KYLIN-1068 update topN merge method, not change the to-be merged object

KYLIN-1068 add sanity check for TopNCounter

KYLIN-1068 sanityCheck for TopN

KYLIN-1068 fix a concurrent modification bug

KYLIN-1068 add debug info

KYLIN-1068 disable TopNcounter basic test

KYLIN-1068 add debug

KYLIN-1068 add debug

KYLIN-1068 add check in TopNAggregator and DoubleDeltaSerializer

KYLIN-1068 fix concurrent bug in DoubleDeltaSerializer and cleanup debug

KYLIN-1068 reuse array in DoulbeDeltaSerializer to avoid OOM error

KYLIN-1068 increase reuseDelta size automatically

KYLIN-1068 remove the sanity check in TopNAggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d0e0d9fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d0e0d9fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d0e0d9fc

Branch: refs/heads/2.x-staging
Commit: d0e0d9fc9e7c6377dcb2a0d9ae71cabcfa98cdcc
Parents: d1b2141
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 26 13:35:24 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      | 14 +---
 build/script/compress.sh                        | 19 ++---
 .../common/topn/DoubleDeltaSerializer.java      | 22 ++++--
 .../kylin/common/topn/DoublyLinkedList.java     | 11 +--
 .../apache/kylin/common/topn/TopNCounter.java   | 83 +++++++++++---------
 .../kylin/common/topn/TopNCounterBasicTest.java |  1 +
 .../cube/inmemcubing/InMemCubeBuilder.java      | 33 +++++---
 .../kylin/gridtable/GTAggregateScanner.java     |  2 +-
 .../kylin/metadata/measure/TopNAggregator.java  | 10 ++-
 .../measure/serializer/DataTypeSerializer.java  |  2 +
 .../serializer/TopNCounterSerializer.java       |  2 +-
 .../java/org/apache/kylin/rest/DebugTomcat.java |  2 +-
 12 files changed, 113 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 26ad960..af3dc43 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -196,8 +196,8 @@ public class BuildCubeWithEngineTest {
         long date2 = f.parse("2013-01-01").getTime();
         long date3 = f.parse("2022-01-01").getTime();
         List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3));
-//        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
         return result;
     }
 
@@ -230,7 +230,6 @@ public class BuildCubeWithEngineTest {
         // this cube's start date is 0, end date is 20120601000000
         long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
         
-        /*
         long dateEnd = f.parse("2012-06-01").getTime();
 
         clearSegment(cubeName);
@@ -238,20 +237,15 @@ public class BuildCubeWithEngineTest {
 
         // then submit an append job, start date is 20120601000000, end
         // date is 20220101000000
-        dateStart = f.parse("2012-06-01").getTime();
+        dateStart = dateEnd;
         dateEnd = f.parse("2022-01-01").getTime();
         result.add(buildSegment(cubeName, dateStart, dateEnd));
 
         // build an empty segment which doesn't have data
-        dateStart = f.parse("2022-01-01").getTime();
+        dateStart = dateEnd;
         dateEnd = f.parse("2023-01-01").getTime();
         result.add(buildSegment(cubeName, dateStart, dateEnd));
 
-*/
-        long dateEnd = f.parse("2023-01-01").getTime();
-
-        clearSegment(cubeName);
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
         return result;
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index c70e567..cfd3c18 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -13,16 +13,17 @@ fi
 
 #package tar.gz
 echo 'package tar.gz'
+package_name=apache-kylin-${version}-bin
 cd build/
-rm -rf kylin-${version}
-mkdir kylin-${version}
-cp -r lib bin conf tomcat ../examples/sample_cube commit_SHA1 kylin-${version}
+rm -rf ${package_name}
+mkdir ${package_name}
+cp -r lib bin conf tomcat ../examples/sample_cube commit_SHA1 ${package_name}
 rm -rf lib tomcat commit_SHA1
-find kylin-${version} -type d -exec chmod 755 {} \;
-find kylin-${version} -type f -exec chmod 644 {} \;
-find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+find ${package_name} -type d -exec chmod 755 {} \;
+find ${package_name} -type f -exec chmod 644 {} \;
+find ${package_name} -type f -name "*.sh" -exec chmod 755 {} \;
 mkdir -p ../dist
-tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
-rm -rf kylin-${version}
+tar -cvzf ../dist/${package_name}.tar.gz ${package_name}
+rm -rf ${package_name}
 
-echo "Package ready: dist/kylin-${version}.tar.gz"
+echo "Package ready: dist/${package_name}.tar.gz"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
index cd798e0..52dee77 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
@@ -41,7 +41,8 @@ public class DoubleDeltaSerializer {
     final private int precision;
     final private int multiplier;
 
-    transient long[] reuseDeltas;
+
+    transient ThreadLocal<long[]> deltasThreadLocal;
 
     public DoubleDeltaSerializer() {
         this(2);
@@ -55,6 +56,7 @@ public class DoubleDeltaSerializer {
 
         this.precision = precision;
         this.multiplier = (int) Math.pow(10, precision);
+        this.deltasThreadLocal = new ThreadLocal<long[]>();
     }
 
     public void serialize(double values[], ByteBuffer buf) {
@@ -110,21 +112,24 @@ public class DoubleDeltaSerializer {
     private long[] calculateDeltas(double[] values) {
         int len = values.length - 1;
         len = Math.max(0, len);
-        if (reuseDeltas == null || reuseDeltas.length < len) {
-            reuseDeltas = new long[len];
+
+        long[] deltas = deltasThreadLocal.get();
+        if (deltas == null || deltas.length < len) {
+            deltas = new long[len];
+            deltasThreadLocal.set(deltas);
         }
-        
+                
         if (len == 0)
-            return reuseDeltas;
+            return deltas;
 
         long current = roundAndPromote(values[0]);
         for (int i = 0; i < len; i++) {
             long next = roundAndPromote(values[i + 1]);
-            reuseDeltas[i] = next - current;
-            assert reuseDeltas[i] >= 0;
+            deltas[i] = next - current;
+            assert deltas[i] >= 0;
             current = next;
         }
-        return reuseDeltas;
+        return deltas;
     }
 
     private long roundAndPromote(double value) {
@@ -142,6 +147,7 @@ public class DoubleDeltaSerializer {
         
         double[] result = new double[len];
         deserialize(buf, meta, result);
+        
         return result;
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index d268a30..1520ce1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
  */
 public class DoublyLinkedList<T> {
 
-    private int size;
+    private int size = 0;
     private ListNode2<T> tail;
     private ListNode2<T> head;
 
@@ -37,14 +37,7 @@ public class DoublyLinkedList<T> {
      */
     public ListNode2<T> add(T value) {
         ListNode2<T> node = new ListNode2<T>(value);
-        if (size++ == 0) {
-            tail = node;
-        } else {
-            node.prev = head;
-            head.next = node;
-        }
-
-        head = node;
+        add(node);
 
         return node;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 1856010..e6c5c90 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.common.topn;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.kylin.common.util.Pair;
 
 import java.util.*;
@@ -116,20 +117,40 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         Counter<T> counter = counterNode.getValue();
         counter.count += incrementCount;
 
-        ListNode2<Counter<T>> nodeNext = counterNode.getNext();
+        ListNode2<Counter<T>> nodeNext; 
+                
+        if (incrementCount > 0) {
+            nodeNext = counterNode.getNext();
+        } else {
+            nodeNext = counterNode.getPrev();
+        }
         counterList.remove(counterNode);
         counterNode.prev = null;
         counterNode.next = null;
-        while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
-            nodeNext = nodeNext.getNext();
-        }
 
-        if (nodeNext != null) {
-            counterList.addBefore(nodeNext, counterNode);
+        if (incrementCount > 0) {
+            while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
+                nodeNext = nodeNext.getNext();
+            }
+            if (nodeNext != null) {
+                counterList.addBefore(nodeNext, counterNode);
+            } else {
+                counterList.add(counterNode);
+            }
+            
         } else {
-            counterList.add(counterNode);
+            while (nodeNext != null && counter.count < nodeNext.getValue().count) {
+                nodeNext = nodeNext.getPrev();
+            }
+            if (nodeNext != null) {
+                counterList.addAfter(nodeNext, counterNode);
+            } else {
+                counterList.enqueue(counterNode);
+            }
         }
 
+       
+
     }
 
     @Override
@@ -196,13 +217,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     }
 
     /**
-     * For de-serialization
-     */
-    public TopNCounter() {
-    }
-
-    /**
-     * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+     * Merge another counter into this counter;
      * @param another
      * @return
      */
@@ -216,22 +231,33 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
             m2 = another.counterList.tail().getValue().count;
         }
         
+        Set<T> duplicateItems = Sets.newHashSet(); 
+        List<T> notDuplicateItems = Lists.newArrayList();
+        
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
             ListNode2<Counter<T>> existing = another.counterMap.get(item);
             if (existing != null) {
-                this.offer(item, another.counterMap.get(item).getValue().count);
-
-                another.counterMap.remove(item);
+                duplicateItems.add(item);
             } else {
-                this.offer(item, m2);
+                notDuplicateItems.add(item);
             }
         }
 
+        for(T item : duplicateItems) {
+            this.offer(item, another.counterMap.get(item).getValue().count);
+        }
+        
+        for(T item : notDuplicateItems) {
+            this.offer(item, m2);
+        }
+
         for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
-            double counter = entry.getValue().getValue().count;
-            this.offer(item, counter + m1);
+            if (duplicateItems.contains(item) == false) {
+                double counter = entry.getValue().getValue().count;
+                this.offer(item, counter + m1);
+            }
         }
 
         return this;
@@ -274,22 +300,6 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         return counters;
     }
 
-    /**
-     * Get the item list order by counter values in ascending order
-     * @return
-     */
-    public List<T> getItems() {
-        List<T> items = Lists.newArrayList();
-        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Counter<T> b = bNode.getValue();
-            items.add(b.item);
-        }
-
-        assert items.size() == this.size();
-        return items;
-
-    }
-
     @Override
     public Iterator<Counter<T>> iterator() {
         return new TopNCounterIterator();
@@ -324,4 +334,5 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
             throw new UnsupportedOperationException();
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 771df58..c3e941b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,6 +16,7 @@
 
 package org.apache.kylin.common.topn;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 1666fc2..7d943a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -30,10 +30,9 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.*;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -463,10 +462,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             mask = mask >> 1;
         }
 
-        return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
+        return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, childDimensions, measureColumns);
     }
 
-    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("Calculating cuboid " + cuboidId);
 
@@ -489,10 +488,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 builder.write(newRecord);
             }
 
-            // disable sanity check for performance
-            long t = System.currentTimeMillis();
-            sanityCheck(scanner.getTotalSumForSanityCheck());
-            logger.info("sanity check for Cuboid " + cuboidId + " cost " + (System.currentTimeMillis() - t) + "ms");
+            //long t = System.currentTimeMillis();
+            //sanityCheck(parentId, cuboidId, scanner.getTotalSumForSanityCheck());
+            //logger.info("sanity check for Cuboid " + cuboidId + " cost " + (System.currentTimeMillis() - t) + "ms");
         } finally {
             scanner.close();
             builder.close();
@@ -505,12 +503,22 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     //@SuppressWarnings("unused")
-    private void sanityCheck(Object[] totalSum) {
+    private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) {
         // double sum introduces error and causes result not exactly equal
         for (int i = 0; i < totalSum.length; i++) {
             if (totalSum[i] instanceof DoubleMutable) {
                 totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
+            } else if (totalSum[i] instanceof TopNCounter) {
+                TopNCounter counter = (TopNCounter) totalSum[i];
+                Iterator<Counter> iterator = counter.iterator();
+                double total = 0.0;
+                while (iterator.hasNext()) {
+                    Counter aCounter = iterator.next();
+                    total += aCounter.getCount();
+                }
+                totalSum[i] = Math.round(total);
             }
+            
         }
 
         if (totalSumForSanityCheck == null) {
@@ -518,6 +526,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return;
         }
         if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+            logger.info("sanityCheck failed when calculate " + cuboidId + " from parent " + parentId);
+            logger.info("Expected: " + Arrays.toString(totalSumForSanityCheck));
+            logger.info("Actually: " + Arrays.toString(totalSum));
             throw new IllegalStateException();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 9050c49..193a05c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -167,7 +167,7 @@ public class GTAggregateScanner implements IGTScanner {
 
             // skip expensive aggregation
             for (int i = 0; i < totalSum.length; i++) {
-                if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator || totalSum[i] instanceof TopNAggregator )
+                if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator || totalSum[i] instanceof TopNAggregator)
                     totalSum[i] = null;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index 4b35a7a..9c21f7b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -18,9 +18,14 @@
 
 package org.apache.kylin.metadata.measure;
 
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * 
  */
@@ -29,6 +34,7 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
 
     int capacity = 0;
     TopNCounter<ByteArray> sum = null;
+    Map<ByteArray, Double> sanityCheckMap;
 
     @Override
     public void reset() {
@@ -39,9 +45,9 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
     public void aggregate(TopNCounter<ByteArray> value) {
         if (sum == null) {
             capacity = value.getCapacity();
-            sum = new TopNCounter<ByteArray>(capacity);
+            sum = new TopNCounter<>(capacity);
+            sanityCheckMap = Maps.newHashMap();
         }
-
         sum.merge(value);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
index d5dc43e..d542098 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Maps;
 /**
  * @author yangli9
  * 
+ * Note: the implementations MUST be thread-safe.
+ * 
  */
 abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index e0f293b..468d077 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -34,7 +34,7 @@ import java.util.List;
  */
 public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
 
-    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer();
+    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
 
     private int precision;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 18730ef..0b7aa70 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -109,7 +109,7 @@ public class DebugTomcat {
 
         String webBase = new File("../webapp/app").getAbsolutePath();
         if (new File(webBase, "WEB-INF").exists() == false) {
-            throw new RuntimeException("In order to launch Kylin web app from IDE, please copy server/src/main/webapp/WEB-INF to  webapp/app/WEB-INF");
+            throw new RuntimeException("In order to launch Kylin web app from IDE, please copy server/src/main/webapp/WEB-INF to  webapp/app/");
         }
 
         Tomcat tomcat = new Tomcat();