You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/20 07:02:53 UTC

[5/5] kylin git commit: KYLIN-1917 TopN counter merge performance improvement

KYLIN-1917 TopN counter merge performance improvement


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cec8b9ed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cec8b9ed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cec8b9ed

Branch: refs/heads/yang21
Commit: cec8b9ed946ec9bb61d4b532c37fb3a69740489c
Parents: 0b3b6f4
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 14:54:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 15:00:17 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/measure/topn/Counter.java  |  17 +-
 .../apache/kylin/measure/topn/TopNCounter.java  | 236 +++++++------------
 .../measure/topn/TopNCounterSerializer.java     |   4 +-
 .../kylin/measure/topn/TopNMeasureType.java     |   2 +-
 .../topn/TopNCounterSerializerTest.java         |   2 +-
 .../measure/topn/TopNCounterBasicTest.java      |   2 +-
 6 files changed, 98 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 041ea2b..cd5b825 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -31,6 +31,8 @@ import java.io.ObjectOutput;
 public class Counter<T> implements Externalizable {
 
     protected T item;
+
+
     protected double count;
     //    protected double error;
 
@@ -42,10 +44,15 @@ public class Counter<T> implements Externalizable {
 
     public Counter(T item) {
         this.count = 0;
-        //        this.error = 0;
         this.item = item;
     }
 
+    public Counter(T item, double count) {
+        this.item = item;
+        this.count = count;
+    }
+
+
     public T getItem() {
         return item;
     }
@@ -54,13 +61,11 @@ public class Counter<T> implements Externalizable {
         return count;
     }
 
-    //    public double getError() {
-    //        return error;
-    //    }
-
+    public void setCount(double count) {
+        this.count = count;
+    }
     @Override
     public String toString() {
-        //        return item + ":" + count + ':' + error;
         return item + ":" + count;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index ab4b40e..cf9978a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,20 +19,22 @@
 package org.apache.kylin.measure.topn;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.util.Pair;
-
+import com.google.common.collect.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
  * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
- * 
+ *
  * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
  * data structure as described in:
  * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
@@ -45,30 +47,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public static final int EXTRA_SPACE_RATE = 50;
 
     protected int capacity;
-    private HashMap<T, ListNode2<Counter<T>>> counterMap;
-    protected DoublyLinkedList<Counter<T>> counterList;
+    private HashMap<T, Counter<T>> counterMap;
+    protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element
+    private boolean ordered = true;
+    private boolean descending = true;
 
     /**
      * @param capacity maximum size (larger capacities improve accuracy)
      */
     public TopNCounter(int capacity) {
         this.capacity = capacity;
-        counterMap = new HashMap<T, ListNode2<Counter<T>>>();
-        counterList = new DoublyLinkedList<Counter<T>>();
+        counterMap = Maps.newHashMap();
+        counterList = Lists.newLinkedList();
     }
 
     public int getCapacity() {
         return capacity;
     }
 
-    /**
-     * Algorithm: <i>Space-Saving</i>
-     *
-     * @param item stream element (<i>e</i>)
-     * @return false if item was already in the stream summary, true otherwise
-     */
-    public boolean offer(T item) {
-        return offer(item, 1.0);
+    public LinkedList<Counter<T>> getCounterList() {
+        return counterList;
+    }
+
+    public void offer(T item) {
+        offer(item, 1.0);
     }
 
     /**
@@ -77,103 +79,35 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      * @param item stream element (<i>e</i>)
      * @return false if item was already in the stream summary, true otherwise
      */
-    public boolean offer(T item, double incrementCount) {
-        return offerReturnAll(item, incrementCount).getFirst();
-    }
-
-    /**
-     * @param item stream element (<i>e</i>)
-     * @return item dropped from summary if an item was dropped, null otherwise
-     */
-    public T offerReturnDropped(T item, double incrementCount) {
-        return offerReturnAll(item, incrementCount).getSecond();
-    }
-
-    /**
-     * @param item stream element (<i>e</i>)
-     * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
-     */
-    public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
-        ListNode2<Counter<T>> counterNode = counterMap.get(item);
-        boolean isNewItem = (counterNode == null);
-        T droppedItem = null;
-        if (isNewItem) {
-
-            if (size() < capacity) {
-                counterNode = counterList.enqueue(new Counter<T>(item));
-            } else {
-                counterNode = counterList.tail();
-                Counter<T> counter = counterNode.getValue();
-                droppedItem = counter.item;
-                counterMap.remove(droppedItem);
-                counter.item = item;
-                counter.count = 0.0;
-            }
+    public void offer(T item, double incrementCount) {
+        Counter<T> counterNode = counterMap.get(item);
+        if (counterNode == null) {
+            counterNode = new Counter<T>(item, incrementCount);
             counterMap.put(item, counterNode);
-        }
-
-        incrementCounter(counterNode, incrementCount);
-
-        return Pair.newPair(isNewItem, droppedItem);
-    }
-
-    protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
-        Counter<T> counter = counterNode.getValue();
-        counter.count += incrementCount;
-
-        ListNode2<Counter<T>> nodeNext;
-
-        if (incrementCount > 0) {
-            nodeNext = counterNode.getNext();
-        } else {
-            nodeNext = counterNode.getPrev();
-        }
-        counterList.remove(counterNode);
-        counterNode.prev = null;
-        counterNode.next = null;
-
-        if (incrementCount > 0) {
-            while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
-                nodeNext = nodeNext.getNext();
-            }
-            if (nodeNext != null) {
-                counterList.addBefore(nodeNext, counterNode);
-            } else {
-                counterList.add(counterNode);
-            }
-
+            counterList.add(counterNode);
         } else {
-            while (nodeNext != null && counter.count < nodeNext.getValue().count) {
-                nodeNext = nodeNext.getPrev();
-            }
-            if (nodeNext != null) {
-                counterList.addAfter(nodeNext, counterNode);
-            } else {
-                counterList.enqueue(counterNode);
-            }
+            counterNode.setCount(counterNode.getCount() + incrementCount);
         }
-
+        ordered = false;
     }
 
-    public List<T> peek(int k) {
-        List<T> topK = new ArrayList<T>(k);
-
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
-            if (topK.size() == k) {
-                return topK;
-            }
-            topK.add(b.item);
-        }
-
-        return topK;
+    /**
+     * Resort and keep the expected size
+     */
+    public void consolidate() {
+        Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
+        retain(capacity);
+        ordered = true;
     }
 
     public List<Counter<T>> topK(int k) {
-        List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
-
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
+        if (ordered == false) {
+            consolidate();
+        }
+        List<Counter<T>> topK = new ArrayList<>(k);
+        Iterator<Counter<T>> iterator = counterList.iterator();
+        while (iterator.hasNext()) {
+            Counter<T> b = iterator.next();
             if (topK.size() == k) {
                 return topK;
             }
@@ -194,8 +128,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append('[');
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
+        Iterator<Counter<T>> iterator = counterList.iterator();
+        while (iterator.hasNext()) {
+            Counter<T> b = iterator.next();
             sb.append(b.item);
             sb.append(':');
             sb.append(b.count);
@@ -211,10 +146,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      * @param count
      */
     public void offerToHead(T item, double count) {
-        Counter<T> c = new Counter<T>(item);
-        c.count = count;
-        ListNode2<Counter<T>> node = counterList.add(c);
-        counterMap.put(c.item, node);
+        Counter<T> c = new Counter<T>(item, count);
+        counterList.addFirst(c);
+        counterMap.put(c.item, c);
     }
 
     /**
@@ -225,19 +159,19 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public TopNCounter<T> merge(TopNCounter<T> another) {
         double m1 = 0.0, m2 = 0.0;
         if (this.size() >= this.capacity) {
-            m1 = this.counterList.tail().getValue().count;
+            m1 = this.counterList.getLast().count;
         }
 
         if (another.size() >= another.capacity) {
-            m2 = another.counterList.tail().getValue().count;
+            m2 = another.counterList.getLast().count;
         }
 
         Set<T> duplicateItems = Sets.newHashSet();
         List<T> notDuplicateItems = Lists.newArrayList();
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+        for (Map.Entry<T, Counter<T>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
-            ListNode2<Counter<T>> existing = another.counterMap.get(item);
+            Counter<T> existing = another.counterMap.get(item);
             if (existing != null) {
                 duplicateItems.add(item);
             } else {
@@ -246,21 +180,22 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
         }
 
         for (T item : duplicateItems) {
-            this.offer(item, another.counterMap.get(item).getValue().count);
+            this.offer(item, another.counterMap.get(item).count);
         }
 
         for (T item : notDuplicateItems) {
             this.offer(item, m2);
         }
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
+        for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
             if (duplicateItems.contains(item) == false) {
-                double counter = entry.getValue().getValue().count;
+                double counter = entry.getValue().count;
                 this.offer(item, counter + m1);
             }
         }
 
+        this.consolidate();
         return this;
     }
 
@@ -271,13 +206,11 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public void retain(int newCapacity) {
         assert newCapacity > 0;
         this.capacity = newCapacity;
-        if (newCapacity < this.size()) {
-            ListNode2<Counter<T>> tail = counterList.tail();
-            while (tail != null && this.size() > newCapacity) {
-                Counter<T> bucket = tail.getValue();
-                this.counterMap.remove(bucket.getItem());
-                this.counterList.remove(tail);
-                tail = this.counterList.tail();
+        if (this.size() > newCapacity) {
+            Counter<T> toRemoved;
+            for (int i = 0, n = this.size() - newCapacity; i < n; i++) {
+                toRemoved = counterList.pollLast();
+                this.counterMap.remove(toRemoved.item);
             }
         }
 
@@ -291,10 +224,15 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Counter<T> b = bNode.getValue();
-            counters[index] = b.count;
-            index++;
+        if (this.descending == true) {
+            Iterator<Counter<T>> iterator = counterList.descendingIterator();
+            while (iterator.hasNext()) {
+                Counter<T> b = iterator.next();
+                counters[index] = b.count;
+                index++;
+            }
+        } else {
+            throw new IllegalStateException(); // support in future
         }
 
         assert index == size();
@@ -303,37 +241,27 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
 
     @Override
     public Iterator<Counter<T>> iterator() {
-        return new TopNCounterIterator();
-    }
-
-    /**
-     * Iterator from the tail (smallest) to head (biggest);
-     */
-    private class TopNCounterIterator implements Iterator<Counter<T>> {
-
-        private ListNode2<Counter<T>> currentBNode;
-
-        private TopNCounterIterator() {
-            currentBNode = counterList.tail();
+        if (this.descending == true) {
+            return this.counterList.descendingIterator();
+        } else {
+            throw new IllegalStateException(); // support in future
         }
+    }
 
+    private static final Comparator ASC_Comparator = new Comparator<Counter>() {
         @Override
-        public boolean hasNext() {
-            return currentBNode != null;
-
+        public int compare(Counter o1, Counter o2) {
+            return o1.getCount() > o2.getCount() ? 1 : o1.getCount() == o2.getCount() ? 0 : -1;
         }
 
-        @Override
-        public Counter<T> next() {
-            Counter<T> counter = currentBNode.getValue();
-            currentBNode = currentBNode.getNext();
-            return counter;
-        }
+    };
 
+    private static final Comparator DESC_Comparator = new Comparator<Counter>() {
         @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
+        public int compare(Counter o1, Counter o2) {
+            return o1.getCount() > o2.getCount() ? -1 : o1.getCount() == o2.getCount() ? 0 : 1;
         }
-    }
+
+    };
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 604365c..071e2a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -65,8 +65,8 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
     @Override
     public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
         double[] counters = value.getCounters();
-        List<ByteArray> peek = value.peek(1);
-        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+        List<Counter<ByteArray>> peek = value.topK(1);
+        int keyLength = peek.size() > 0 ? peek.get(0).getItem().length() : 0;
         out.putInt(value.getCapacity());
         out.putInt(value.size());
         out.putInt(keyLength);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 11a260a..761c17f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -260,7 +260,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
             };
         }
 
-        if (digest.aggregations.size() == 0 ) {
+        if (digest.aggregations.size() == 0) {
             // directly query the UHC column without sorting
             unmatchedDimensions.removeAll(literalCol);
             return new CapabilityInfluence() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index 7e7fd31..dedb4f5 100644
--- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -55,7 +55,7 @@ public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
         for (Integer i : stream) {
             vs.offer(new ByteArray(Bytes.toBytes(i)));
         }
-
+        vs.consolidate();
         ByteBuffer out = ByteBuffer.allocate(1024);
         serializer.serialize(vs, out);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cec8b9ed/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index cb92338..162ef01 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -44,7 +44,7 @@ public class TopNCounterBasicTest {
 
     @Test
     public void testTopK() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
+        TopNCounter<String> vs = new TopNCounter<>(3);
         String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);