You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/10/12 16:21:49 UTC

incubator-kylin git commit: KYLIN-1068 Optimize the memory footprint for TopN counter

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1068 [created] 207cd455d


KYLIN-1068 Optimize the memory footprint for TopN counter

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/207cd455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/207cd455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/207cd455

Branch: refs/heads/KYLIN-1068
Commit: 207cd455ddb9d582d90515fc93e928f04d41d3ee
Parents: 18940e0
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 12 22:05:32 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/topn/Counter.java   |   5 +-
 .../kylin/common/topn/DoublyLinkedList.java     |  18 ++
 .../apache/kylin/common/topn/TopNCounter.java   | 196 ++++++-------------
 .../kylin/common/topn/TopNCounterBasicTest.java |   2 +-
 .../common/topn/TopNCounterCombinationTest.java |   1 -
 5 files changed, 78 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 866d3d8..cde67e7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -31,8 +31,6 @@ import java.io.ObjectOutput;
  */
 public class Counter<T> implements Externalizable {
 
-    protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
-
     protected T item;
     protected double count;
     protected double error;
@@ -43,8 +41,7 @@ public class Counter<T> implements Externalizable {
     public Counter() {
     }
 
-    public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
-        this.bucketNode = bucket;
+    public Counter(T item) {
         this.count = 0;
         this.error = 0;
         this.item = item;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 0942b84..bf0e582 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -54,6 +54,11 @@ public class DoublyLinkedList<T> implements Iterable<T> {
      */
     public ListNode2<T> enqueue(T value) {
         ListNode2<T> node = new ListNode2<T>(value);
+       
+        return enqueue(node);
+    }
+
+    public ListNode2<T> enqueue(ListNode2<T> node) {
         if (size++ == 0) {
             head = node;
         } else {
@@ -97,6 +102,19 @@ public class DoublyLinkedList<T> implements Iterable<T> {
         size++;
     }
 
+
+    public void addBefore(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.prev = node.prev;
+        newNode.next = node;
+        node.prev = newNode;
+        if (newNode.prev == null) {
+            tail = newNode;
+        } else {
+            newNode.prev.next = newNode;
+        }
+        size++;
+    }
+
     public void remove(ListNode2<T> node) {
         if (node == tail) {
             tail = node.next;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 6814b8d..6ce4a89 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -21,8 +21,6 @@ package org.apache.kylin.common.topn;
 import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.Pair;
 
-import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 /**
@@ -36,29 +34,12 @@ import java.util.*;
  * @param <T> type of data in the stream to be summarized
  */
 public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
-    
-    public static final int EXTRA_SPACE_RATE = 50;
-
-
-    protected class Bucket {
-
-        protected DoublyLinkedList<Counter<T>> counterList;
 
-        private double count;
-
-        public Bucket(double count) {
-            this.count = count;
-            this.counterList = new DoublyLinkedList<Counter<T>>();
-        }
-
-        public int size() {
-            return counterList.size();
-        }
-    }
+    public static final int EXTRA_SPACE_RATE = 50;
 
     protected int capacity;
     private HashMap<T, ListNode2<Counter<T>>> counterMap;
-    protected DoublyLinkedList<Bucket> bucketList;
+    protected DoublyLinkedList<Counter<T>> bucketList;
 
     /**
      * @param capacity maximum size (larger capacities improve accuracy)
@@ -66,7 +47,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public TopNCounter(int capacity) {
         this.capacity = capacity;
         counterMap = new HashMap<T, ListNode2<Counter<T>>>();
-        bucketList = new DoublyLinkedList<Bucket>();
+        bucketList = new DoublyLinkedList<Counter<T>>();
     }
 
     public int getCapacity() {
@@ -114,15 +95,15 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         if (isNewItem) {
 
             if (size() < capacity) {
-                counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+                counterNode = bucketList.enqueue(new Counter<T>(item));
             } else {
-                Bucket min = bucketList.first();
-                counterNode = min.counterList.tail();
+                counterNode = bucketList.tail();
                 Counter<T> counter = counterNode.getValue();
                 droppedItem = counter.item;
                 counterMap.remove(droppedItem);
                 counter.item = item;
-                counter.error = min.count;
+                counter.error = counter.count;
+                counter.count = 0.0;
             }
             counterMap.put(item, counterNode);
         }
@@ -133,56 +114,40 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     }
 
     protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
-        Counter<T> counter = counterNode.getValue(); // count_i
-        ListNode2<Bucket> oldNode = counter.bucketNode;
-        Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
-        bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+        Counter<T> counter = counterNode.getValue();
         counter.count = counter.count + incrementCount;
 
-        // Finding the right bucket for count_i
-        // Because we allow a single call to increment count more than once, this may not be the adjacent bucket. 
-        ListNode2<Bucket> bucketNodePrev = oldNode;
-        ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+        ListNode2<Counter<T>> bucketNodeNext = counterNode.getNext();
+        bucketList.remove(counterNode);
+        counterNode.prev = null;
+        counterNode.next = null;
         while (bucketNodeNext != null) {
-            Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
-            if (counter.count == bucketNext.count) {
-                bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
-                break;
-            } else if (counter.count > bucketNext.count) {
-                bucketNodePrev = bucketNodeNext;
-                bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+            Counter<T> bucketNext = bucketNodeNext.getValue(); 
+            if (counter.count >= bucketNext.count) {
+                bucketNodeNext = bucketNodeNext.getNext(); 
             } else {
-                // A new bucket has to be created
-                bucketNodeNext = null;
+                break;
             }
         }
 
-        if (bucketNodeNext == null) {
-            Bucket bucketNext = new Bucket(counter.count);
-            bucketNext.counterList.add(counterNode);
-            bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+        if (bucketNodeNext != null) {
+            bucketList.addBefore(bucketNodeNext, counterNode);
+        } else {
+            bucketList.add(counterNode);
         }
-        counter.bucketNode = bucketNodeNext;
 
-        //Cleaning up
-        if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
-        {
-            bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
-        }
     }
 
     @Override
     public List<T> peek(int k) {
         List<T> topK = new ArrayList<T>(k);
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                if (topK.size() == k) {
-                    return topK;
-                }
-                topK.add(c.item);
+        for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
             }
+            topK.add(b.item);
         }
 
         return topK;
@@ -191,14 +156,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public List<Counter<T>> topK(int k) {
         List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                if (topK.size() == k) {
-                    return topK;
-                }
-                topK.add(c);
+        for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
             }
+            topK.add(b);
         }
 
         return topK;
@@ -215,47 +178,27 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append('[');
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Bucket b = bNode.getValue();
-            sb.append('{');
+        for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            sb.append(b.item);
+            sb.append(':');
             sb.append(b.count);
-            sb.append(":[");
-            for (Counter<T> c : b.counterList) {
-                sb.append('{');
-                sb.append(c.item);
-                sb.append(':');
-                sb.append(c.error);
-                sb.append("},");
-            }
-            if (b.counterList.size() > 0) {
-                sb.deleteCharAt(sb.length() - 1);
-            }
-            sb.append("]},");
-        }
-        if (bucketList.size() > 0) {
-            sb.deleteCharAt(sb.length() - 1);
         }
         sb.append(']');
         return sb.toString();
     }
 
     public void fromExternal(int size, double[] counters, List<T> items) {
-        this.bucketList = new DoublyLinkedList<Bucket>();
+        this.bucketList = new DoublyLinkedList<Counter<T>>();
 
         this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
 
-        Bucket currentBucket = null;
-        ListNode2<Bucket> currentBucketNode = null;
         for (int i = 0; i < size; i++) {
             Counter<T> c = new Counter<T>();
             c.count = counters[i];
             c.item = items.get(i);
-            if (currentBucket == null || c.count != currentBucket.count) {
-                currentBucket = new Bucket(c.count);
-                currentBucketNode = bucketList.add(currentBucket);
-            }
-            c.bucketNode = currentBucketNode;
-            counterMap.put(c.item, currentBucket.counterList.add(c));
+            ListNode2<Counter<T>> node = bucketList.add(c);
+            counterMap.put(c.item, node);
         }
     }
 
@@ -313,17 +256,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         assert newCapacity > 0;
         this.capacity = newCapacity;
         if (newCapacity < this.size()) {
-            ListNode2<Bucket> tail = bucketList.tail;
+            ListNode2<Counter<T>> tail = bucketList.tail;
             while (tail != null && this.size() > newCapacity) {
-                Bucket bucket = tail.getValue();
-
-                for (Counter<T> counter : bucket.counterList) {
-                    this.counterMap.remove(counter.getItem());
-                }
-                tail = tail.getNext();
+                Counter<T> bucket = bucketList.tail.getValue();
+                this.counterMap.remove(bucket.getItem());
+                this.bucketList.remove(tail);
+                tail = this.bucketList.tail;
             }
-
-            tail.next = null;
         }
 
     }
@@ -336,12 +275,10 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                counters[index] = c.count;
-                index ++;
-            }
+        for (ListNode2<Counter<T>> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Counter<T> b = bNode.getValue();
+            counters[index] = b.count;
+            index++;
         }
 
         return counters;
@@ -353,11 +290,9 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
      */
     public List<T> getItems() {
         List<T> items = Lists.newArrayList();
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                items.add(c.item);
-            }
+        for (ListNode2<Counter<T>> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+            Counter<T> b = bNode.getValue();
+            items.add(b.item);
         }
 
         return items;
@@ -368,46 +303,31 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     public Iterator<Counter<T>> iterator() {
         return new TopNCounterIterator();
     }
-    
+
     private class TopNCounterIterator implements Iterator {
 
-        private ListNode2<Bucket> currentBNode;
-        private Iterator<Counter<T>> currentCounterIterator;
-        
+        private ListNode2<Counter<T>> currentBNode;
+
         private TopNCounterIterator() {
             currentBNode = bucketList.head();
-            if (currentBNode != null && currentBNode.getValue() != null) {
-                currentCounterIterator = currentBNode.getValue().counterList.iterator();
-            }
         }
-        
+
         @Override
         public boolean hasNext() {
-            if (currentCounterIterator == null) {
-                return false;
-            }
-            
-            if (currentCounterIterator.hasNext()) {
-                return true;
-            }
+            return currentBNode != null;
 
-            currentBNode = currentBNode.getPrev();
-            
-            if (currentBNode == null)
-                return false;
-
-            currentCounterIterator = currentBNode.getValue().counterList.iterator();
-            return hasNext();
         }
 
         @Override
         public Counter<T> next() {
-            return currentCounterIterator.next();
+            Counter<T> counter = currentBNode.getValue();
+            currentBNode = currentBNode.getPrev();
+            return counter;
         }
 
         @Override
         public void remove() {
             throw new UnsupportedOperationException();
         }
-    } 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 252e955..a10851f 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -110,7 +110,7 @@ public class TopNCounterBasicTest {
     public void testMerge() {
 
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B", "A" };
         for (String i : stream) {
             vs.offer(i);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index 3d809df..699ea05 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,7 +29,6 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
 
     @Parameterized.Parameters
     public static Collection<Integer[]> configs() {
-        //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
         return Arrays.asList(new Integer[][] {
                 // with 20X space
                 { 10, 20 }, // top 10%