You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/17 08:10:46 UTC
[2/7] incubator-kylin git commit: KYLIN-943 initial commit for
function and performance test
KYLIN-943 initial commit for function and performance test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7c501760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7c501760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7c501760
Branch: refs/heads/KYLIN-943
Commit: 7c501760050e0d9a9160442ecf87dca554ededfa
Parents: 4456bb1
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 10:05:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:20 2015 +0800
----------------------------------------------------------------------
core-common/pom.xml | 4 +
.../org/apache/kylin/common/topn/Counter.java | 85 +++++
.../kylin/common/topn/DoublyLinkedList.java | 188 +++++++++++
.../org/apache/kylin/common/topn/ITopK.java | 53 +++
.../org/apache/kylin/common/topn/ListNode2.java | 51 +++
.../apache/kylin/common/topn/TopNCounter.java | 336 +++++++++++++++++++
.../common/topn/TopNCounterComparisonTest.java | 281 ++++++++++++++++
.../kylin/common/topn/TopNCounterTest.java | 174 ++++++++++
pom.xml | 6 +
9 files changed, 1178 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 577db42..f9e715d 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -65,6 +65,10 @@
<artifactId>commons-email</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
new file mode 100644
index 0000000..0f7d8de
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Modified from Counter.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class Counter<T> implements Externalizable {
+
+ protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
+
+ protected T item;
+ protected double count;
+ protected double error;
+
+ /**
+ * For de-serialization
+ */
+ public Counter() {
+ }
+
+ public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
+ this.bucketNode = bucket;
+ this.count = 0;
+ this.error = 0;
+ this.item = item;
+ }
+
+ public T getItem() {
+ return item;
+ }
+
+ public double getCount() {
+ return count;
+ }
+
+ public double getError() {
+ return error;
+ }
+
+ @Override
+ public String toString() {
+ return item + ":" + count + ':' + error;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ item = (T) in.readObject();
+ count = in.readDouble();
+ error = in.readDouble();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(item);
+ out.writeDouble(count);
+ out.writeDouble(error);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
new file mode 100644
index 0000000..0942b84
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class DoublyLinkedList<T> implements Iterable<T> {
+
+ protected int size;
+ protected ListNode2<T> tail;
+ protected ListNode2<T> head;
+
+ /**
+ * Append to head of list
+ */
+ public ListNode2<T> add(T value) {
+ ListNode2<T> node = new ListNode2<T>(value);
+ if (size++ == 0) {
+ tail = node;
+ } else {
+ node.prev = head;
+ head.next = node;
+ }
+
+ head = node;
+
+ return node;
+ }
+
+ /**
+ * Prepend to tail of list
+ */
+ public ListNode2<T> enqueue(T value) {
+ ListNode2<T> node = new ListNode2<T>(value);
+ if (size++ == 0) {
+ head = node;
+ } else {
+ node.next = tail;
+ tail.prev = node;
+ }
+
+ tail = node;
+
+ return node;
+ }
+
+ public void add(ListNode2<T> node) {
+ node.prev = head;
+ node.next = null;
+
+ if (size++ == 0) {
+ tail = node;
+ } else {
+ head.next = node;
+ }
+
+ head = node;
+ }
+
+ public ListNode2<T> addAfter(ListNode2<T> node, T value) {
+ ListNode2<T> newNode = new ListNode2<T>(value);
+ addAfter(node, newNode);
+ return newNode;
+ }
+
+ public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
+ newNode.next = node.next;
+ newNode.prev = node;
+ node.next = newNode;
+ if (newNode.next == null) {
+ head = newNode;
+ } else {
+ newNode.next.prev = newNode;
+ }
+ size++;
+ }
+
+ public void remove(ListNode2<T> node) {
+ if (node == tail) {
+ tail = node.next;
+ } else {
+ node.prev.next = node.next;
+ }
+
+ if (node == head) {
+ head = node.prev;
+ } else {
+ node.next.prev = node.prev;
+ }
+ size--;
+ }
+
+ public int size() {
+ return size;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return new DoublyLinkedListIterator(this);
+ }
+
+ protected class DoublyLinkedListIterator implements Iterator<T> {
+
+ protected DoublyLinkedList<T> list;
+ protected ListNode2<T> itr;
+ protected int length;
+
+ public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
+ this.length = list.size;
+ this.list = list;
+ this.itr = list.tail;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr != null;
+ }
+
+ @Override
+ public T next() {
+ if (length != list.size) {
+ throw new ConcurrentModificationException();
+ }
+ T next = itr.value;
+ itr = itr.next;
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public T first() {
+ return tail == null ? null : tail.getValue();
+ }
+
+ public T last() {
+ return head == null ? null : head.getValue();
+ }
+
+ public ListNode2<T> head() {
+ return head;
+ }
+
+ public ListNode2<T> tail() {
+ return tail;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T[] toArray() {
+ T[] a = (T[]) new Object[size];
+ int i = 0;
+ for (T v : this) {
+ a[i++] = v;
+ }
+ return a;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
new file mode 100644
index 0000000..36603b7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.List;
+
+/**
+ * Modified from https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public interface ITopK<T> {
+
+ /**
+ * offer a single element to the top.
+ *
+ * @param element - the element to add to the top
+ * @return false if the element was already in the top
+ */
+ boolean offer(T element);
+
+ /**
+ * offer a single element to the top and increment the count
+ * for that element by incrementCount.
+ *
+ * @param element - the element to add to the top
+ * @param incrementCount - the increment count for the given count
+ * @return false if the element was already in the top
+ */
+ boolean offer(T element, double incrementCount);
+
+ /**
+ * @param k
+ * @return top k elements offered (may be an approximation)
+ */
+ List<T> peek(int k);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
new file mode 100644
index 0000000..92f5f57
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+/**
+ * Modified from ListNode2.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class ListNode2<T> {
+
+ protected T value;
+ protected ListNode2<T> prev;
+ protected ListNode2<T> next;
+
+ public ListNode2(T value) {
+ this.value = value;
+ }
+
+ public ListNode2<T> getPrev() {
+ return prev;
+ }
+
+ public ListNode2<T> getNext() {
+ return next;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
new file mode 100644
index 0000000..0a45d0b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
+ *
+ * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
+ * data structure as described in:
+ * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
+ * by Metwally, Agrawal, and Abbadi
+ *
+ * @param <T> type of data in the stream to be summarized
+ */
+public class TopNCounter<T> implements ITopK<T>, Externalizable {
+
+ protected class Bucket {
+
+ protected DoublyLinkedList<Counter<T>> counterList;
+
+ private double count;
+
+ public Bucket(double count) {
+ this.count = count;
+ this.counterList = new DoublyLinkedList<Counter<T>>();
+ }
+ }
+
+ protected int capacity;
+ private HashMap<T, ListNode2<Counter<T>>> counterMap;
+ protected DoublyLinkedList<Bucket> bucketList;
+
+ /**
+ * @param capacity maximum size (larger capacities improve accuracy)
+ */
+ public TopNCounter(int capacity) {
+ this.capacity = capacity;
+ counterMap = new HashMap<T, ListNode2<Counter<T>>>();
+ bucketList = new DoublyLinkedList<Bucket>();
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Algorithm: <i>Space-Saving</i>
+ *
+ * @param item stream element (<i>e</i>)
+ * @return false if item was already in the stream summary, true otherwise
+ */
+ @Override
+ public boolean offer(T item) {
+ return offer(item, 1.0);
+ }
+
+ /**
+ * Algorithm: <i>Space-Saving</i>
+ *
+ * @param item stream element (<i>e</i>)
+ * @return false if item was already in the stream summary, true otherwise
+ */
+ @Override
+ public boolean offer(T item, double incrementCount) {
+ return offerReturnAll(item, incrementCount).getFirst();
+ }
+
+ /**
+ * @param item stream element (<i>e</i>)
+ * @return item dropped from summary if an item was dropped, null otherwise
+ */
+ public T offerReturnDropped(T item, int incrementCount) {
+ return offerReturnAll(item, incrementCount).getSecond();
+ }
+
+ /**
+ * @param item stream element (<i>e</i>)
+ * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
+ */
+ public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
+ ListNode2<Counter<T>> counterNode = counterMap.get(item);
+ boolean isNewItem = (counterNode == null);
+ T droppedItem = null;
+ if (isNewItem) {
+
+ if (size() < capacity) {
+ counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+ } else {
+ Bucket min = bucketList.first();
+ counterNode = min.counterList.tail();
+ Counter<T> counter = counterNode.getValue();
+ droppedItem = counter.item;
+ counterMap.remove(droppedItem);
+ counter.item = item;
+ counter.error = min.count;
+ }
+ counterMap.put(item, counterNode);
+ }
+
+ incrementCounter(counterNode, incrementCount);
+
+ return new Pair<Boolean, T>(isNewItem, droppedItem);
+ }
+
+ protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
+ Counter<T> counter = counterNode.getValue(); // count_i
+ ListNode2<Bucket> oldNode = counter.bucketNode;
+ Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
+ bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+ counter.count = counter.count + incrementCount;
+
+ // Finding the right bucket for count_i
+ // Because we allow a single call to increment count more than once, this may not be the adjacent bucket.
+ ListNode2<Bucket> bucketNodePrev = oldNode;
+ ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+ while (bucketNodeNext != null) {
+ Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
+ if (counter.count == bucketNext.count) {
+ bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
+ break;
+ } else if (counter.count > bucketNext.count) {
+ bucketNodePrev = bucketNodeNext;
+ bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+ } else {
+ // A new bucket has to be created
+ bucketNodeNext = null;
+ }
+ }
+
+ if (bucketNodeNext == null) {
+ Bucket bucketNext = new Bucket(counter.count);
+ bucketNext.counterList.add(counterNode);
+ bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+ }
+ counter.bucketNode = bucketNodeNext;
+
+ //Cleaning up
+ if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
+ {
+ bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
+ }
+ }
+
+ @Override
+ public List<T> peek(int k) {
+ List<T> topK = new ArrayList<T>(k);
+
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ if (topK.size() == k) {
+ return topK;
+ }
+ topK.add(c.item);
+ }
+ }
+
+ return topK;
+ }
+
+ public List<Counter<T>> topK(int k) {
+ List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
+
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ if (topK.size() == k) {
+ return topK;
+ }
+ topK.add(c);
+ }
+ }
+
+ return topK;
+ }
+
+ /**
+ * @return number of items stored
+ */
+ public int size() {
+ return counterMap.size();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ sb.append('{');
+ sb.append(b.count);
+ sb.append(":[");
+ for (Counter<T> c : b.counterList) {
+ sb.append('{');
+ sb.append(c.item);
+ sb.append(':');
+ sb.append(c.error);
+ sb.append("},");
+ }
+ if (b.counterList.size() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ sb.append("]},");
+ }
+ if (bucketList.size() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ this.bucketList = new DoublyLinkedList<Bucket>();
+ this.capacity = in.readInt();
+
+ int size = in.readInt();
+ this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+ Bucket currentBucket = null;
+ ListNode2<Bucket> currentBucketNode = null;
+ for (int i = 0; i < size; i++) {
+ Counter<T> c = (Counter<T>) in.readObject();
+ if (currentBucket == null || c.count != currentBucket.count) {
+ currentBucket = new Bucket(c.count);
+ currentBucketNode = bucketList.add(currentBucket);
+ }
+ c.bucketNode = currentBucketNode;
+ counterMap.put(c.item, currentBucket.counterList.add(c));
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(this.capacity);
+ out.writeInt(this.size());
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ out.writeObject(c);
+ }
+ }
+ }
+
+ /**
+ * For de-serialization
+ */
+ public TopNCounter() {
+ }
+
+ /**
+ * For de-serialization
+ *
+ * @param bytes
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ */
+ public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
+ fromBytes(bytes);
+ }
+
+ public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
+ readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
+ }
+
+ public byte[] toBytes() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ this.writeExternal(out);
+ out.flush();
+ return baos.toByteArray();
+
+ }
+
+ public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
+ TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
+ secondCounter.fromBytes(another.toBytes());
+ double m1 = 0.0, m2 = 0.0;
+ if (this.size() >= this.capacity) {
+ m1 = this.bucketList.tail().getValue().count;
+ }
+
+ if (secondCounter.size() >= secondCounter.capacity) {
+ m2 = secondCounter.bucketList.tail().getValue().count;
+ }
+
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+ T item = entry.getKey();
+ ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+ if (existing != null) {
+ this.offer(item, secondCounter.counterMap.get(item).getValue().count);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+
+ secondCounter.counterMap.remove(item);
+ } else {
+ this.offer(item, m2);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
+ }
+ }
+
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+ T item = entry.getKey();
+ double counter = entry.getValue().getValue().count;
+ double error = entry.getValue().getValue().error;
+ this.offer(item, counter + m1);
+ this.counterMap.get(item).getValue().error = error + m1;
+ }
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
new file mode 100644
index 0000000..fb82522
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class TopNCounterComparisonTest {
+
+ private static final int TOP_K = 100;
+
+ private static final int KEY_SPACE = 100 * TOP_K;
+
+ private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
+
+ private static final int SPACE_SAVING_ROOM = 100;
+
+ @Before
+ public void setup() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ protected String prepareTestDate() throws IOException {
+ String[] allKeys = new String[KEY_SPACE];
+
+ for (int i = 0; i < KEY_SPACE; i++) {
+ allKeys[i] = RandomStringUtils.randomAlphabetic(10);
+ }
+
+ System.out.println("Start to create test random data...");
+ long startTime = System.currentTimeMillis();
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ int keyIndex;
+
+ File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+ if (tempFile.exists())
+ FileUtils.forceDelete(tempFile);
+ FileWriter fw = new FileWriter(tempFile);
+ try {
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ keyIndex = zipf.sample();
+ fw.write(allKeys[keyIndex]);
+ fw.write('\n');
+ }
+ } finally {
+ if (fw != null)
+ fw.close();
+ }
+
+ System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+ System.out.println("Test data in : " + tempFile.getAbsolutePath());
+
+ return tempFile.getAbsolutePath();
+ }
+
+ //@Test
+ public void testCorrectness() throws IOException {
+ String dataFile = prepareTestDate();
+ TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
+ TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+
+ for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+ feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
+ }
+
+ FileUtils.forceDelete(new File(dataFile));
+
+ compareResult(spaceSavingCounter, accurateCounter);
+ }
+
+ private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
+ List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+ System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+ List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+ System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+ int error = 0;
+ for (int i = 0; i < topResult1.size(); i++) {
+ System.out.println("Compare " + i);
+
+ // if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
+ && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ } else {
+ System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+ error++;
+ }
+ }
+
+ Assert.assertEquals(0, error);
+ }
+
+ @Test
+ public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+ String dataFile = prepareTestDate();
+
+ int PARALLEL = 10;
+ TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
+
+ for (int i = 0; i < PARALLEL; i++) {
+ parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
+ }
+
+ int slice = TOTAL_RECORDS / PARALLEL;
+ int startPosition = 0;
+ for (int i = 0; i < PARALLEL; i++) {
+ feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+ startPosition += slice;
+ }
+
+ // merge counters
+
+ // for (int i = 1; i < PARALLEL; i++) {
+ // parallelCounters[0].vs.merge(parallelCounters[i].vs);
+ // }
+
+ TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
+
+ TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+ feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+ compareResult(mergedCounters[0], accurateCounter);
+ FileUtils.forceDelete(new File(dataFile));
+
+ }
+
+ private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
+
+ for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+ if (i + 1 < n) {
+ consumers[i].vs.merge(consumers[i + 1].vs);
+ }
+
+ list.add(consumers[i]);
+ }
+
+ return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
+ }
+
+ private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+ long startTime = System.currentTimeMillis();
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
+
+ int lineNum = 0;
+ String line = bufferedReader.readLine();
+ while (line != null) {
+ if (lineNum >= startLine && lineNum < endLine) {
+ consumer.addElement(line, 1.0);
+ }
+ line = bufferedReader.readLine();
+ lineNum++;
+ }
+
+ bufferedReader.close();
+ System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+ }
+
+ private static interface TestDataConsumer {
+ public void addElement(String elementKey, double value);
+
+ public List<Pair<String, Double>> getTopN(int k);
+
+ public long getSpentTime();
+ }
+
+ private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+ private long timeSpent = 0;
+ protected TopNCounter<String> vs;
+
+ public SpaceSavingConsumer() {
+ vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
+
+ }
+
+ public void addElement(String key, double value) {
+ //System.out.println("Adding " + key + ":" + incrementCount);
+ long startTime = System.currentTimeMillis();
+ vs.offer(key, value);
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Counter<String>> tops = vs.topK(k);
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Counter<String> counter : tops)
+ allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords;
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
+ }
+
+ private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+
+ private long timeSpent = 0;
+ private Map<String, Double> hashMap;
+
+ public HashMapConsumer() {
+ hashMap = Maps.newHashMap();
+ }
+
+ public void addElement(String key, double value) {
+ long startTime = System.currentTimeMillis();
+ if (hashMap.containsKey(key)) {
+ hashMap.put(key, hashMap.get(key) + value);
+ } else {
+ hashMap.put(key, value);
+ }
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+ allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+ }
+
+ Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+ @Override
+ public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+ return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+ }
+ });
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords.subList(0, k);
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
new file mode 100644
index 0000000..23620d1
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterTest {
+
+ private static final int NUM_ITERATIONS = 100000;
+
+ @Test
+ public void testTopNCounter() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ for (String i : stream) {
+ vs.offer(i);
+ /*
+ for(String s : vs.poll(3))
+ System.out.print(s+" ");
+ */
+ System.out.println(vs);
+ }
+
+ List<Counter<String>> topk = vs.topK(6);
+
+ for(Counter<String> top : topk) {
+ System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+ }
+
+ }
+
+ @Test
+ public void testTopK() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrement() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i, 10);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrementOutOfOrder() {
+ TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+ TopNCounter<String> vs_single = new TopNCounter<String>(3);
+ String[] stream = {"A", "B", "C", "D", "A"};
+ Integer[] increments = {15, 20, 25, 30, 1};
+
+ for (int i = 0; i < stream.length; i++) {
+ vs_increment.offer(stream[i], increments[i]);
+ for (int k = 0; k < increments[i]; k++) {
+ vs_single.offer(stream[i]);
+ }
+ }
+ System.out.println("Insert with counts vs. single inserts:");
+ System.out.println(vs_increment);
+ System.out.println(vs_single);
+
+ List<Counter<String>> topK_increment = vs_increment.topK(3);
+ List<Counter<String>> topK_single = vs_single.topK(3);
+
+ for (int i = 0; i < topK_increment.size(); i++) {
+ assertEquals(topK_increment.get(i).getItem(),
+ topK_single.get(i).getItem());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(c);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Counter<String> clone = (Counter<String>) oi.readObject();
+ assertEquals(c.getCount(), clone.getCount(), 0.0001);
+ assertEquals(c.getError(), clone.getError(), 0.0001);
+ assertEquals(c.getItem(), clone.getItem());
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(vs);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+
+
+ @Test
+ public void testByteSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ testSerialization(vs);
+
+ // Empty
+ vs = new TopNCounter<String>(0);
+ testSerialization(vs);
+ }
+
+ private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+ byte[] bytes = vs.toBytes();
+ TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 360c6b1..051df4d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<commons-httpclient.version>3.1</commons-httpclient.version>
<commons-collections4.version>4.0</commons-collections4.version>
<commons-email.version>1.1</commons-email.version>
+ <commons-math3.version>3.6-SNAPSHOT</commons-math3.version>
<!-- Spark -->
<spark.version>1.3.0</spark.version>
@@ -386,6 +387,11 @@
<version>${commons-email.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math3.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>