You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/10/12 16:21:49 UTC
incubator-kylin git commit: KYLIN-1068 Optimize the memory footprint
for TopN counter
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1068 [created] 207cd455d
KYLIN-1068 Optimize the memory footprint for TopN counter
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/207cd455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/207cd455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/207cd455
Branch: refs/heads/KYLIN-1068
Commit: 207cd455ddb9d582d90515fc93e928f04d41d3ee
Parents: 18940e0
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 12 22:05:32 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/topn/Counter.java | 5 +-
.../kylin/common/topn/DoublyLinkedList.java | 18 ++
.../apache/kylin/common/topn/TopNCounter.java | 196 ++++++-------------
.../kylin/common/topn/TopNCounterBasicTest.java | 2 +-
.../common/topn/TopNCounterCombinationTest.java | 1 -
5 files changed, 78 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 866d3d8..cde67e7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -31,8 +31,6 @@ import java.io.ObjectOutput;
*/
public class Counter<T> implements Externalizable {
- protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
-
protected T item;
protected double count;
protected double error;
@@ -43,8 +41,7 @@ public class Counter<T> implements Externalizable {
public Counter() {
}
- public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
- this.bucketNode = bucket;
+ public Counter(T item) {
this.count = 0;
this.error = 0;
this.item = item;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 0942b84..bf0e582 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -54,6 +54,11 @@ public class DoublyLinkedList<T> implements Iterable<T> {
*/
public ListNode2<T> enqueue(T value) {
ListNode2<T> node = new ListNode2<T>(value);
+
+ return enqueue(node);
+ }
+
+ public ListNode2<T> enqueue(ListNode2<T> node) {
if (size++ == 0) {
head = node;
} else {
@@ -97,6 +102,19 @@ public class DoublyLinkedList<T> implements Iterable<T> {
size++;
}
+
+ public void addBefore(ListNode2<T> node, ListNode2<T> newNode) {
+ newNode.prev = node.prev;
+ newNode.next = node;
+ node.prev = newNode;
+ if (newNode.prev == null) {
+ tail = newNode;
+ } else {
+ newNode.prev.next = newNode;
+ }
+ size++;
+ }
+
public void remove(ListNode2<T> node) {
if (node == tail) {
tail = node.next;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 6814b8d..6ce4a89 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -21,8 +21,6 @@ package org.apache.kylin.common.topn;
import com.google.common.collect.Lists;
import org.apache.kylin.common.util.Pair;
-import java.io.*;
-import java.nio.ByteBuffer;
import java.util.*;
/**
@@ -36,29 +34,12 @@ import java.util.*;
* @param <T> type of data in the stream to be summarized
*/
public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
-
- public static final int EXTRA_SPACE_RATE = 50;
-
-
- protected class Bucket {
-
- protected DoublyLinkedList<Counter<T>> counterList;
- private double count;
-
- public Bucket(double count) {
- this.count = count;
- this.counterList = new DoublyLinkedList<Counter<T>>();
- }
-
- public int size() {
- return counterList.size();
- }
- }
+ public static final int EXTRA_SPACE_RATE = 50;
protected int capacity;
private HashMap<T, ListNode2<Counter<T>>> counterMap;
- protected DoublyLinkedList<Bucket> bucketList;
+ protected DoublyLinkedList<Counter<T>> bucketList;
/**
* @param capacity maximum size (larger capacities improve accuracy)
@@ -66,7 +47,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public TopNCounter(int capacity) {
this.capacity = capacity;
counterMap = new HashMap<T, ListNode2<Counter<T>>>();
- bucketList = new DoublyLinkedList<Bucket>();
+ bucketList = new DoublyLinkedList<Counter<T>>();
}
public int getCapacity() {
@@ -114,15 +95,15 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
if (isNewItem) {
if (size() < capacity) {
- counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+ counterNode = bucketList.enqueue(new Counter<T>(item));
} else {
- Bucket min = bucketList.first();
- counterNode = min.counterList.tail();
+ counterNode = bucketList.tail();
Counter<T> counter = counterNode.getValue();
droppedItem = counter.item;
counterMap.remove(droppedItem);
counter.item = item;
- counter.error = min.count;
+ counter.error = counter.count;
+ counter.count = 0.0;
}
counterMap.put(item, counterNode);
}
@@ -133,56 +114,40 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
}
protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
- Counter<T> counter = counterNode.getValue(); // count_i
- ListNode2<Bucket> oldNode = counter.bucketNode;
- Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
- bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+ Counter<T> counter = counterNode.getValue();
counter.count = counter.count + incrementCount;
- // Finding the right bucket for count_i
- // Because we allow a single call to increment count more than once, this may not be the adjacent bucket.
- ListNode2<Bucket> bucketNodePrev = oldNode;
- ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+ ListNode2<Counter<T>> bucketNodeNext = counterNode.getNext();
+ bucketList.remove(counterNode);
+ counterNode.prev = null;
+ counterNode.next = null;
while (bucketNodeNext != null) {
- Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
- if (counter.count == bucketNext.count) {
- bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
- break;
- } else if (counter.count > bucketNext.count) {
- bucketNodePrev = bucketNodeNext;
- bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+ Counter<T> bucketNext = bucketNodeNext.getValue();
+ if (counter.count >= bucketNext.count) {
+ bucketNodeNext = bucketNodeNext.getNext();
} else {
- // A new bucket has to be created
- bucketNodeNext = null;
+ break;
}
}
- if (bucketNodeNext == null) {
- Bucket bucketNext = new Bucket(counter.count);
- bucketNext.counterList.add(counterNode);
- bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+ if (bucketNodeNext != null) {
+ bucketList.addBefore(bucketNodeNext, counterNode);
+ } else {
+ bucketList.add(counterNode);
}
- counter.bucketNode = bucketNodeNext;
- //Cleaning up
- if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
- {
- bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
- }
}
@Override
public List<T> peek(int k) {
List<T> topK = new ArrayList<T>(k);
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- if (topK.size() == k) {
- return topK;
- }
- topK.add(c.item);
+ for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ if (topK.size() == k) {
+ return topK;
}
+ topK.add(b.item);
}
return topK;
@@ -191,14 +156,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public List<Counter<T>> topK(int k) {
List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- if (topK.size() == k) {
- return topK;
- }
- topK.add(c);
+ for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ if (topK.size() == k) {
+ return topK;
}
+ topK.add(b);
}
return topK;
@@ -215,47 +178,27 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- sb.append('{');
+ for (ListNode2<Counter<T>> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ sb.append(b.item);
+ sb.append(':');
sb.append(b.count);
- sb.append(":[");
- for (Counter<T> c : b.counterList) {
- sb.append('{');
- sb.append(c.item);
- sb.append(':');
- sb.append(c.error);
- sb.append("},");
- }
- if (b.counterList.size() > 0) {
- sb.deleteCharAt(sb.length() - 1);
- }
- sb.append("]},");
- }
- if (bucketList.size() > 0) {
- sb.deleteCharAt(sb.length() - 1);
}
sb.append(']');
return sb.toString();
}
public void fromExternal(int size, double[] counters, List<T> items) {
- this.bucketList = new DoublyLinkedList<Bucket>();
+ this.bucketList = new DoublyLinkedList<Counter<T>>();
this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
- Bucket currentBucket = null;
- ListNode2<Bucket> currentBucketNode = null;
for (int i = 0; i < size; i++) {
Counter<T> c = new Counter<T>();
c.count = counters[i];
c.item = items.get(i);
- if (currentBucket == null || c.count != currentBucket.count) {
- currentBucket = new Bucket(c.count);
- currentBucketNode = bucketList.add(currentBucket);
- }
- c.bucketNode = currentBucketNode;
- counterMap.put(c.item, currentBucket.counterList.add(c));
+ ListNode2<Counter<T>> node = bucketList.add(c);
+ counterMap.put(c.item, node);
}
}
@@ -313,17 +256,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
assert newCapacity > 0;
this.capacity = newCapacity;
if (newCapacity < this.size()) {
- ListNode2<Bucket> tail = bucketList.tail;
+ ListNode2<Counter<T>> tail = bucketList.tail;
while (tail != null && this.size() > newCapacity) {
- Bucket bucket = tail.getValue();
-
- for (Counter<T> counter : bucket.counterList) {
- this.counterMap.remove(counter.getItem());
- }
- tail = tail.getNext();
+ Counter<T> bucket = bucketList.tail.getValue();
+ this.counterMap.remove(bucket.getItem());
+ this.bucketList.remove(tail);
+ tail = this.bucketList.tail;
}
-
- tail.next = null;
}
}
@@ -336,12 +275,10 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- counters[index] = c.count;
- index ++;
- }
+ for (ListNode2<Counter<T>> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Counter<T> b = bNode.getValue();
+ counters[index] = b.count;
+ index++;
}
return counters;
@@ -353,11 +290,9 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
*/
public List<T> getItems() {
List<T> items = Lists.newArrayList();
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- items.add(c.item);
- }
+ for (ListNode2<Counter<T>> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Counter<T> b = bNode.getValue();
+ items.add(b.item);
}
return items;
@@ -368,46 +303,31 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public Iterator<Counter<T>> iterator() {
return new TopNCounterIterator();
}
-
+
private class TopNCounterIterator implements Iterator {
- private ListNode2<Bucket> currentBNode;
- private Iterator<Counter<T>> currentCounterIterator;
-
+ private ListNode2<Counter<T>> currentBNode;
+
private TopNCounterIterator() {
currentBNode = bucketList.head();
- if (currentBNode != null && currentBNode.getValue() != null) {
- currentCounterIterator = currentBNode.getValue().counterList.iterator();
- }
}
-
+
@Override
public boolean hasNext() {
- if (currentCounterIterator == null) {
- return false;
- }
-
- if (currentCounterIterator.hasNext()) {
- return true;
- }
+ return currentBNode != null;
- currentBNode = currentBNode.getPrev();
-
- if (currentBNode == null)
- return false;
-
- currentCounterIterator = currentBNode.getValue().counterList.iterator();
- return hasNext();
}
@Override
public Counter<T> next() {
- return currentCounterIterator.next();
+ Counter<T> counter = currentBNode.getValue();
+ currentBNode = currentBNode.getPrev();
+ return counter;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 252e955..a10851f 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -110,7 +110,7 @@ public class TopNCounterBasicTest {
public void testMerge() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B", "A" };
for (String i : stream) {
vs.offer(i);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/207cd455/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index 3d809df..699ea05 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,7 +29,6 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
@Parameterized.Parameters
public static Collection<Integer[]> configs() {
- // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
return Arrays.asList(new Integer[][] {
// with 20X space
{ 10, 20 }, // top 10%