You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/20 07:02:53 UTC
[5/5] kylin git commit: KYLIN-1917 TopN counter merge performance
improvement
KYLIN-1917 TopN counter merge performance improvement
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cec8b9ed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cec8b9ed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cec8b9ed
Branch: refs/heads/yang21
Commit: cec8b9ed946ec9bb61d4b532c37fb3a69740489c
Parents: 0b3b6f4
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 14:54:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 15:00:17 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/measure/topn/Counter.java | 17 +-
.../apache/kylin/measure/topn/TopNCounter.java | 236 +++++++------------
.../measure/topn/TopNCounterSerializer.java | 4 +-
.../kylin/measure/topn/TopNMeasureType.java | 2 +-
.../topn/TopNCounterSerializerTest.java | 2 +-
.../measure/topn/TopNCounterBasicTest.java | 2 +-
6 files changed, 98 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 041ea2b..cd5b825 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -31,6 +31,8 @@ import java.io.ObjectOutput;
public class Counter<T> implements Externalizable {
protected T item;
+
+
protected double count;
// protected double error;
@@ -42,10 +44,15 @@ public class Counter<T> implements Externalizable {
public Counter(T item) {
this.count = 0;
- // this.error = 0;
this.item = item;
}
+ public Counter(T item, double count) {
+ this.item = item;
+ this.count = count;
+ }
+
+
public T getItem() {
return item;
}
@@ -54,13 +61,11 @@ public class Counter<T> implements Externalizable {
return count;
}
- // public double getError() {
- // return error;
- // }
-
+ public void setCount(double count) {
+ this.count = count;
+ }
@Override
public String toString() {
- // return item + ":" + count + ':' + error;
return item + ":" + count;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index ab4b40e..cf9978a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,20 +19,22 @@
package org.apache.kylin.measure.topn;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.util.Pair;
-
+import com.google.common.collect.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
- *
+ *
* Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
* data structure as described in:
* <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
@@ -45,30 +47,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public static final int EXTRA_SPACE_RATE = 50;
protected int capacity;
- private HashMap<T, ListNode2<Counter<T>>> counterMap;
- protected DoublyLinkedList<Counter<T>> counterList;
+ private HashMap<T, Counter<T>> counterMap;
+ protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element
+ private boolean ordered = true;
+ private boolean descending = true;
/**
* @param capacity maximum size (larger capacities improve accuracy)
*/
public TopNCounter(int capacity) {
this.capacity = capacity;
- counterMap = new HashMap<T, ListNode2<Counter<T>>>();
- counterList = new DoublyLinkedList<Counter<T>>();
+ counterMap = Maps.newHashMap();
+ counterList = Lists.newLinkedList();
}
public int getCapacity() {
return capacity;
}
- /**
- * Algorithm: <i>Space-Saving</i>
- *
- * @param item stream element (<i>e</i>)
- * @return false if item was already in the stream summary, true otherwise
- */
- public boolean offer(T item) {
- return offer(item, 1.0);
+ public LinkedList<Counter<T>> getCounterList() {
+ return counterList;
+ }
+
+ public void offer(T item) {
+ offer(item, 1.0);
}
/**
@@ -77,103 +79,35 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
* @param item stream element (<i>e</i>)
* @return false if item was already in the stream summary, true otherwise
*/
- public boolean offer(T item, double incrementCount) {
- return offerReturnAll(item, incrementCount).getFirst();
- }
-
- /**
- * @param item stream element (<i>e</i>)
- * @return item dropped from summary if an item was dropped, null otherwise
- */
- public T offerReturnDropped(T item, double incrementCount) {
- return offerReturnAll(item, incrementCount).getSecond();
- }
-
- /**
- * @param item stream element (<i>e</i>)
- * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
- */
- public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
- ListNode2<Counter<T>> counterNode = counterMap.get(item);
- boolean isNewItem = (counterNode == null);
- T droppedItem = null;
- if (isNewItem) {
-
- if (size() < capacity) {
- counterNode = counterList.enqueue(new Counter<T>(item));
- } else {
- counterNode = counterList.tail();
- Counter<T> counter = counterNode.getValue();
- droppedItem = counter.item;
- counterMap.remove(droppedItem);
- counter.item = item;
- counter.count = 0.0;
- }
+ public void offer(T item, double incrementCount) {
+ Counter<T> counterNode = counterMap.get(item);
+ if (counterNode == null) {
+ counterNode = new Counter<T>(item, incrementCount);
counterMap.put(item, counterNode);
- }
-
- incrementCounter(counterNode, incrementCount);
-
- return Pair.newPair(isNewItem, droppedItem);
- }
-
- protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
- Counter<T> counter = counterNode.getValue();
- counter.count += incrementCount;
-
- ListNode2<Counter<T>> nodeNext;
-
- if (incrementCount > 0) {
- nodeNext = counterNode.getNext();
- } else {
- nodeNext = counterNode.getPrev();
- }
- counterList.remove(counterNode);
- counterNode.prev = null;
- counterNode.next = null;
-
- if (incrementCount > 0) {
- while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
- nodeNext = nodeNext.getNext();
- }
- if (nodeNext != null) {
- counterList.addBefore(nodeNext, counterNode);
- } else {
- counterList.add(counterNode);
- }
-
+ counterList.add(counterNode);
} else {
- while (nodeNext != null && counter.count < nodeNext.getValue().count) {
- nodeNext = nodeNext.getPrev();
- }
- if (nodeNext != null) {
- counterList.addAfter(nodeNext, counterNode);
- } else {
- counterList.enqueue(counterNode);
- }
+ counterNode.setCount(counterNode.getCount() + incrementCount);
}
-
+ ordered = false;
}
- public List<T> peek(int k) {
- List<T> topK = new ArrayList<T>(k);
-
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
- if (topK.size() == k) {
- return topK;
- }
- topK.add(b.item);
- }
-
- return topK;
+ /**
+ * Resort and keep the expected size
+ */
+ public void consolidate() {
+ Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
+ retain(capacity);
+ ordered = true;
}
public List<Counter<T>> topK(int k) {
- List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
-
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
+ if (ordered == false) {
+ consolidate();
+ }
+ List<Counter<T>> topK = new ArrayList<>(k);
+ Iterator<Counter<T>> iterator = counterList.iterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
if (topK.size() == k) {
return topK;
}
@@ -194,8 +128,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
+ Iterator<Counter<T>> iterator = counterList.iterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
sb.append(b.item);
sb.append(':');
sb.append(b.count);
@@ -211,10 +146,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
* @param count
*/
public void offerToHead(T item, double count) {
- Counter<T> c = new Counter<T>(item);
- c.count = count;
- ListNode2<Counter<T>> node = counterList.add(c);
- counterMap.put(c.item, node);
+ Counter<T> c = new Counter<T>(item, count);
+ counterList.addFirst(c);
+ counterMap.put(c.item, c);
}
/**
@@ -225,19 +159,19 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public TopNCounter<T> merge(TopNCounter<T> another) {
double m1 = 0.0, m2 = 0.0;
if (this.size() >= this.capacity) {
- m1 = this.counterList.tail().getValue().count;
+ m1 = this.counterList.getLast().count;
}
if (another.size() >= another.capacity) {
- m2 = another.counterList.tail().getValue().count;
+ m2 = another.counterList.getLast().count;
}
Set<T> duplicateItems = Sets.newHashSet();
List<T> notDuplicateItems = Lists.newArrayList();
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+ for (Map.Entry<T, Counter<T>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
- ListNode2<Counter<T>> existing = another.counterMap.get(item);
+ Counter<T> existing = another.counterMap.get(item);
if (existing != null) {
duplicateItems.add(item);
} else {
@@ -246,21 +180,22 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
}
for (T item : duplicateItems) {
- this.offer(item, another.counterMap.get(item).getValue().count);
+ this.offer(item, another.counterMap.get(item).count);
}
for (T item : notDuplicateItems) {
this.offer(item, m2);
}
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
+ for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
if (duplicateItems.contains(item) == false) {
- double counter = entry.getValue().getValue().count;
+ double counter = entry.getValue().count;
this.offer(item, counter + m1);
}
}
+ this.consolidate();
return this;
}
@@ -271,13 +206,11 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public void retain(int newCapacity) {
assert newCapacity > 0;
this.capacity = newCapacity;
- if (newCapacity < this.size()) {
- ListNode2<Counter<T>> tail = counterList.tail();
- while (tail != null && this.size() > newCapacity) {
- Counter<T> bucket = tail.getValue();
- this.counterMap.remove(bucket.getItem());
- this.counterList.remove(tail);
- tail = this.counterList.tail();
+ if (this.size() > newCapacity) {
+ Counter<T> toRemoved;
+ for (int i = 0, n = this.size() - newCapacity; i < n; i++) {
+ toRemoved = counterList.pollLast();
+ this.counterMap.remove(toRemoved.item);
}
}
@@ -291,10 +224,15 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
- Counter<T> b = bNode.getValue();
- counters[index] = b.count;
- index++;
+ if (this.descending == true) {
+ Iterator<Counter<T>> iterator = counterList.descendingIterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
+ counters[index] = b.count;
+ index++;
+ }
+ } else {
+ throw new IllegalStateException(); // support in future
}
assert index == size();
@@ -303,37 +241,27 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
@Override
public Iterator<Counter<T>> iterator() {
- return new TopNCounterIterator();
- }
-
- /**
- * Iterator from the tail (smallest) to head (biggest);
- */
- private class TopNCounterIterator implements Iterator<Counter<T>> {
-
- private ListNode2<Counter<T>> currentBNode;
-
- private TopNCounterIterator() {
- currentBNode = counterList.tail();
+ if (this.descending == true) {
+ return this.counterList.descendingIterator();
+ } else {
+ throw new IllegalStateException(); // support in future
}
+ }
+ private static final Comparator ASC_Comparator = new Comparator<Counter>() {
@Override
- public boolean hasNext() {
- return currentBNode != null;
-
+ public int compare(Counter o1, Counter o2) {
+ return o1.getCount() > o2.getCount() ? 1 : o1.getCount() == o2.getCount() ? 0 : -1;
}
- @Override
- public Counter<T> next() {
- Counter<T> counter = currentBNode.getValue();
- currentBNode = currentBNode.getNext();
- return counter;
- }
+ };
+ private static final Comparator DESC_Comparator = new Comparator<Counter>() {
@Override
- public void remove() {
- throw new UnsupportedOperationException();
+ public int compare(Counter o1, Counter o2) {
+ return o1.getCount() > o2.getCount() ? -1 : o1.getCount() == o2.getCount() ? 0 : 1;
}
- }
+
+ };
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 604365c..071e2a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -65,8 +65,8 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
@Override
public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
double[] counters = value.getCounters();
- List<ByteArray> peek = value.peek(1);
- int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+ List<Counter<ByteArray>> peek = value.topK(1);
+ int keyLength = peek.size() > 0 ? peek.get(0).getItem().length() : 0;
out.putInt(value.getCapacity());
out.putInt(value.size());
out.putInt(keyLength);
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 11a260a..761c17f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -260,7 +260,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
};
}
- if (digest.aggregations.size() == 0 ) {
+ if (digest.aggregations.size() == 0) {
// directly query the UHC column without sorting
unmatchedDimensions.removeAll(literalCol);
return new CapabilityInfluence() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index 7e7fd31..dedb4f5 100644
--- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -55,7 +55,7 @@ public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
for (Integer i : stream) {
vs.offer(new ByteArray(Bytes.toBytes(i)));
}
-
+ vs.consolidate();
ByteBuffer out = ByteBuffer.allocate(1024);
serializer.serialize(vs, out);
http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index cb92338..162ef01 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -44,7 +44,7 @@ public class TopNCounterBasicTest {
@Test
public void testTopK() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
+ TopNCounter<String> vs = new TopNCounter<>(3);
String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);