You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/20 06:28:01 UTC

[1/4] kylin git commit: KYLIN-1917 TopN counter merge performance improvement

Repository: kylin
Updated Branches:
  refs/heads/master b4329a66a -> 30a4162dd


KYLIN-1917 TopN counter merge performance improvement

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30a4162d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30a4162d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30a4162d

Branch: refs/heads/master
Commit: 30a4162dd7af4158c90b6965a5bcf14119fae2e1
Parents: 501c12a
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 14:27:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/measure/topn/TopNCounter.java  | 16 +++++-----------
 .../kylin/source/kafka/job/SeekOffsetStep.java      | 15 +++++++--------
 2 files changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 072fe90..cf9978a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -96,14 +96,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      */
     public void consolidate() {
         Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
-
-        if (this.size() > this.capacity) {
-            for (int x = this.size() - 1; x >= capacity; x--) {
-                Counter<T> removed = counterList.remove(x);
-                this.counterMap.remove(removed.item);
-            }
-        }
-
+        retain(capacity);
         ordered = true;
     }
 
@@ -214,9 +207,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
         assert newCapacity > 0;
         this.capacity = newCapacity;
         if (this.size() > newCapacity) {
-            for (int x = newCapacity; x < this.size(); x++) {
-                Counter<T> removed = counterList.remove(x);
-                this.counterMap.remove(removed.item);
+            Counter<T> toRemoved;
+            for (int i = 0, n = this.size() - newCapacity; i < n; i++) {
+                toRemoved = counterList.pollLast();
+                this.counterMap.remove(toRemoved.item);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index a26f39d..98d6e4d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -84,16 +84,15 @@ public class SeekOffsetStep extends AbstractExecutable {
                         }
                     }
                 }
+                logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+            }
 
-                if (partitionInfos.size() > startOffsets.size()) {
-                    // has new partition added
-                    for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
-                        long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
-                        startOffsets.put(partitionInfos.get(x).partition(), earliest);
-                    }
+            if (partitionInfos.size() > startOffsets.size()) {
+                // has new partition added
+                for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+                    long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+                    startOffsets.put(partitionInfos.get(x).partition(), earliest);
                 }
-
-                logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
             }
 
             if (endOffsets.isEmpty()) {


[2/4] kylin git commit: KYLIN-1917 TopN counter merge performance improvement

Posted by sh...@apache.org.
KYLIN-1917 TopN counter merge performance improvement

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0deabb65
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0deabb65
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0deabb65

Branch: refs/heads/master
Commit: 0deabb6504dd901b97585a7c25aea7643c80a6f7
Parents: 111e792
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 13:21:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/measure/topn/Counter.java  |  17 +-
 .../apache/kylin/measure/topn/TopNCounter.java  | 234 +++++++------------
 .../measure/topn/TopNCounterSerializer.java     |   4 +-
 .../topn/TopNCounterSerializerTest.java         |   2 +-
 .../measure/topn/TopNCounterBasicTest.java      |   2 +-
 5 files changed, 99 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 041ea2b..cd5b825 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -31,6 +31,8 @@ import java.io.ObjectOutput;
 public class Counter<T> implements Externalizable {
 
     protected T item;
+
+
     protected double count;
     //    protected double error;
 
@@ -42,10 +44,15 @@ public class Counter<T> implements Externalizable {
 
     public Counter(T item) {
         this.count = 0;
-        //        this.error = 0;
         this.item = item;
     }
 
+    public Counter(T item, double count) {
+        this.item = item;
+        this.count = count;
+    }
+
+
     public T getItem() {
         return item;
     }
@@ -54,13 +61,11 @@ public class Counter<T> implements Externalizable {
         return count;
     }
 
-    //    public double getError() {
-    //        return error;
-    //    }
-
+    public void setCount(double count) {
+        this.count = count;
+    }
     @Override
     public String toString() {
-        //        return item + ":" + count + ':' + error;
         return item + ":" + count;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index ab4b40e..072fe90 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,20 +19,22 @@
 package org.apache.kylin.measure.topn;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.util.Pair;
-
+import com.google.common.collect.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
  * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
- * 
+ *
  * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
  * data structure as described in:
  * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
@@ -45,30 +47,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public static final int EXTRA_SPACE_RATE = 50;
 
     protected int capacity;
-    private HashMap<T, ListNode2<Counter<T>>> counterMap;
-    protected DoublyLinkedList<Counter<T>> counterList;
+    private HashMap<T, Counter<T>> counterMap;
+    protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element
+    private boolean ordered = true;
+    private boolean descending = true;
 
     /**
      * @param capacity maximum size (larger capacities improve accuracy)
      */
     public TopNCounter(int capacity) {
         this.capacity = capacity;
-        counterMap = new HashMap<T, ListNode2<Counter<T>>>();
-        counterList = new DoublyLinkedList<Counter<T>>();
+        counterMap = Maps.newHashMap();
+        counterList = Lists.newLinkedList();
     }
 
     public int getCapacity() {
         return capacity;
     }
 
-    /**
-     * Algorithm: <i>Space-Saving</i>
-     *
-     * @param item stream element (<i>e</i>)
-     * @return false if item was already in the stream summary, true otherwise
-     */
-    public boolean offer(T item) {
-        return offer(item, 1.0);
+    public LinkedList<Counter<T>> getCounterList() {
+        return counterList;
+    }
+
+    public void offer(T item) {
+        offer(item, 1.0);
     }
 
     /**
@@ -77,103 +79,42 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      * @param item stream element (<i>e</i>)
      * @return false if item was already in the stream summary, true otherwise
      */
-    public boolean offer(T item, double incrementCount) {
-        return offerReturnAll(item, incrementCount).getFirst();
-    }
-
-    /**
-     * @param item stream element (<i>e</i>)
-     * @return item dropped from summary if an item was dropped, null otherwise
-     */
-    public T offerReturnDropped(T item, double incrementCount) {
-        return offerReturnAll(item, incrementCount).getSecond();
-    }
-
-    /**
-     * @param item stream element (<i>e</i>)
-     * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
-     */
-    public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
-        ListNode2<Counter<T>> counterNode = counterMap.get(item);
-        boolean isNewItem = (counterNode == null);
-        T droppedItem = null;
-        if (isNewItem) {
-
-            if (size() < capacity) {
-                counterNode = counterList.enqueue(new Counter<T>(item));
-            } else {
-                counterNode = counterList.tail();
-                Counter<T> counter = counterNode.getValue();
-                droppedItem = counter.item;
-                counterMap.remove(droppedItem);
-                counter.item = item;
-                counter.count = 0.0;
-            }
+    public void offer(T item, double incrementCount) {
+        Counter<T> counterNode = counterMap.get(item);
+        if (counterNode == null) {
+            counterNode = new Counter<T>(item, incrementCount);
             counterMap.put(item, counterNode);
-        }
-
-        incrementCounter(counterNode, incrementCount);
-
-        return Pair.newPair(isNewItem, droppedItem);
-    }
-
-    protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
-        Counter<T> counter = counterNode.getValue();
-        counter.count += incrementCount;
-
-        ListNode2<Counter<T>> nodeNext;
-
-        if (incrementCount > 0) {
-            nodeNext = counterNode.getNext();
-        } else {
-            nodeNext = counterNode.getPrev();
-        }
-        counterList.remove(counterNode);
-        counterNode.prev = null;
-        counterNode.next = null;
-
-        if (incrementCount > 0) {
-            while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
-                nodeNext = nodeNext.getNext();
-            }
-            if (nodeNext != null) {
-                counterList.addBefore(nodeNext, counterNode);
-            } else {
-                counterList.add(counterNode);
-            }
-
+            counterList.add(counterNode);
         } else {
-            while (nodeNext != null && counter.count < nodeNext.getValue().count) {
-                nodeNext = nodeNext.getPrev();
-            }
-            if (nodeNext != null) {
-                counterList.addAfter(nodeNext, counterNode);
-            } else {
-                counterList.enqueue(counterNode);
-            }
+            counterNode.setCount(counterNode.getCount() + incrementCount);
         }
-
+        ordered = false;
     }
 
-    public List<T> peek(int k) {
-        List<T> topK = new ArrayList<T>(k);
+    /**
+     * Resort and keep the expected size
+     */
+    public void consolidate() {
+        Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
 
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
-            if (topK.size() == k) {
-                return topK;
+        if (this.size() > this.capacity) {
+            for (int x = this.size() - 1; x >= capacity; x--) {
+                Counter<T> removed = counterList.remove(x);
+                this.counterMap.remove(removed.item);
             }
-            topK.add(b.item);
         }
 
-        return topK;
+        ordered = true;
     }
 
     public List<Counter<T>> topK(int k) {
-        List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
-
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
+        if (ordered == false) {
+            consolidate();
+        }
+        List<Counter<T>> topK = new ArrayList<>(k);
+        Iterator<Counter<T>> iterator = counterList.iterator();
+        while (iterator.hasNext()) {
+            Counter<T> b = iterator.next();
             if (topK.size() == k) {
                 return topK;
             }
@@ -194,8 +135,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append('[');
-        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
-            Counter<T> b = bNode.getValue();
+        Iterator<Counter<T>> iterator = counterList.iterator();
+        while (iterator.hasNext()) {
+            Counter<T> b = iterator.next();
             sb.append(b.item);
             sb.append(':');
             sb.append(b.count);
@@ -211,10 +153,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
      * @param count
      */
     public void offerToHead(T item, double count) {
-        Counter<T> c = new Counter<T>(item);
-        c.count = count;
-        ListNode2<Counter<T>> node = counterList.add(c);
-        counterMap.put(c.item, node);
+        Counter<T> c = new Counter<T>(item, count);
+        counterList.addFirst(c);
+        counterMap.put(c.item, c);
     }
 
     /**
@@ -225,19 +166,19 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public TopNCounter<T> merge(TopNCounter<T> another) {
         double m1 = 0.0, m2 = 0.0;
         if (this.size() >= this.capacity) {
-            m1 = this.counterList.tail().getValue().count;
+            m1 = this.counterList.getLast().count;
         }
 
         if (another.size() >= another.capacity) {
-            m2 = another.counterList.tail().getValue().count;
+            m2 = another.counterList.getLast().count;
         }
 
         Set<T> duplicateItems = Sets.newHashSet();
         List<T> notDuplicateItems = Lists.newArrayList();
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+        for (Map.Entry<T, Counter<T>> entry : this.counterMap.entrySet()) {
             T item = entry.getKey();
-            ListNode2<Counter<T>> existing = another.counterMap.get(item);
+            Counter<T> existing = another.counterMap.get(item);
             if (existing != null) {
                 duplicateItems.add(item);
             } else {
@@ -246,21 +187,22 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
         }
 
         for (T item : duplicateItems) {
-            this.offer(item, another.counterMap.get(item).getValue().count);
+            this.offer(item, another.counterMap.get(item).count);
         }
 
         for (T item : notDuplicateItems) {
             this.offer(item, m2);
         }
 
-        for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
+        for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
             T item = entry.getKey();
             if (duplicateItems.contains(item) == false) {
-                double counter = entry.getValue().getValue().count;
+                double counter = entry.getValue().count;
                 this.offer(item, counter + m1);
             }
         }
 
+        this.consolidate();
         return this;
     }
 
@@ -271,13 +213,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
     public void retain(int newCapacity) {
         assert newCapacity > 0;
         this.capacity = newCapacity;
-        if (newCapacity < this.size()) {
-            ListNode2<Counter<T>> tail = counterList.tail();
-            while (tail != null && this.size() > newCapacity) {
-                Counter<T> bucket = tail.getValue();
-                this.counterMap.remove(bucket.getItem());
-                this.counterList.remove(tail);
-                tail = this.counterList.tail();
+        if (this.size() > newCapacity) {
+            for (int x = newCapacity; x < this.size(); x++) {
+                Counter<T> removed = counterList.remove(x);
+                this.counterMap.remove(removed.item);
             }
         }
 
@@ -291,10 +230,15 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Counter<T> b = bNode.getValue();
-            counters[index] = b.count;
-            index++;
+        if (this.descending == true) {
+            Iterator<Counter<T>> iterator = counterList.descendingIterator();
+            while (iterator.hasNext()) {
+                Counter<T> b = iterator.next();
+                counters[index] = b.count;
+                index++;
+            }
+        } else {
+            throw new IllegalStateException(); // support in future
         }
 
         assert index == size();
@@ -303,37 +247,27 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
 
     @Override
     public Iterator<Counter<T>> iterator() {
-        return new TopNCounterIterator();
-    }
-
-    /**
-     * Iterator from the tail (smallest) to head (biggest);
-     */
-    private class TopNCounterIterator implements Iterator<Counter<T>> {
-
-        private ListNode2<Counter<T>> currentBNode;
-
-        private TopNCounterIterator() {
-            currentBNode = counterList.tail();
+        if (this.descending == true) {
+            return this.counterList.descendingIterator();
+        } else {
+            throw new IllegalStateException(); // support in future
         }
+    }
 
+    private static final Comparator ASC_Comparator = new Comparator<Counter>() {
         @Override
-        public boolean hasNext() {
-            return currentBNode != null;
-
+        public int compare(Counter o1, Counter o2) {
+            return o1.getCount() > o2.getCount() ? 1 : o1.getCount() == o2.getCount() ? 0 : -1;
         }
 
-        @Override
-        public Counter<T> next() {
-            Counter<T> counter = currentBNode.getValue();
-            currentBNode = currentBNode.getNext();
-            return counter;
-        }
+    };
 
+    private static final Comparator DESC_Comparator = new Comparator<Counter>() {
         @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
+        public int compare(Counter o1, Counter o2) {
+            return o1.getCount() > o2.getCount() ? -1 : o1.getCount() == o2.getCount() ? 0 : 1;
         }
-    }
+
+    };
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 604365c..071e2a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -65,8 +65,8 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
     @Override
     public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
         double[] counters = value.getCounters();
-        List<ByteArray> peek = value.peek(1);
-        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+        List<Counter<ByteArray>> peek = value.topK(1);
+        int keyLength = peek.size() > 0 ? peek.get(0).getItem().length() : 0;
         out.putInt(value.getCapacity());
         out.putInt(value.size());
         out.putInt(keyLength);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index 7e7fd31..dedb4f5 100644
--- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -55,7 +55,7 @@ public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
         for (Integer i : stream) {
             vs.offer(new ByteArray(Bytes.toBytes(i)));
         }
-
+        vs.consolidate();
         ByteBuffer out = ByteBuffer.allocate(1024);
         serializer.serialize(vs, out);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index cb92338..162ef01 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -44,7 +44,7 @@ public class TopNCounterBasicTest {
 
     @Test
     public void testTopK() {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
+        TopNCounter<String> vs = new TopNCounter<>(3);
         String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);


[3/4] kylin git commit: KYLIN-2112 Allow a column be a dimension as well as "group by" column in TopN measure

Posted by sh...@apache.org.
KYLIN-2112 Allow a column be a dimension as well as "group by" column in TopN measure	


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/501c12a3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/501c12a3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/501c12a3

Branch: refs/heads/master
Commit: 501c12a3002f2d116a328fdad229f4b16e5bde75
Parents: 0deabb6
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 13:31:39 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800

----------------------------------------------------------------------
 .../model/validation/rule/FunctionRule.java     | 46 +++++++++-----------
 .../kylin/measure/topn/TopNMeasureType.java     |  2 -
 ...test_kylin_cube_with_slr_left_join_desc.json | 24 ++++++++++
 3 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 792f18d..bcc9010 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -62,10 +62,10 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
     public void validate(CubeDesc cube, ValidateContext context) {
         List<MeasureDesc> measures = cube.getMeasures();
 
-        if (validateMeasureNamesDuplicated(measures, context)) {
-            return;
-        }
-
+        if (validateMeasureNamesDuplicated(measures, context)) {
+            return;
+        }
+
         List<FunctionDesc> countFuncs = new ArrayList<FunctionDesc>();
 
         Iterator<MeasureDesc> it = measures.iterator();
@@ -126,12 +126,6 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
                     groupByCol = groupByCol.getNextParameter();
                 }
 
-                if (duplicatedCol.size() > 0) {
-                    context.addResult(ResultLevel.ERROR, "Couldn't use " + duplicatedCol.toString() + " in Top-N as it is already defined as dimension.");
-                    return;
-
-                }
-
             }
         }
 
@@ -189,20 +183,20 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
         }
 
     }
-
-    /**
-     * @param measures
-     */
-    private boolean validateMeasureNamesDuplicated(List<MeasureDesc> measures, ValidateContext context) {
-        Set<String> nameSet = new HashSet<>();
-        for (MeasureDesc measure: measures){
-            if (nameSet.contains(measure.getName())){
-                context.addResult(ResultLevel.ERROR, "There is duplicated measure's name: " + measure.getName());
-                return true;
-            } else {
-                nameSet.add(measure.getName());
-            }
-        }
-        return false;
-    }
+
+    /**
+     * @param measures
+     */
+    private boolean validateMeasureNamesDuplicated(List<MeasureDesc> measures, ValidateContext context) {
+        Set<String> nameSet = new HashSet<>();
+        for (MeasureDesc measure: measures){
+            if (nameSet.contains(measure.getName())){
+                context.addResult(ResultLevel.ERROR, "There is duplicated measure's name: " + measure.getName());
+                return true;
+            } else {
+                nameSet.add(measure.getName());
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 2f93b77..3173bc1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -239,8 +239,6 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
             }
         }
 
-        if (unmatchedDimensions.containsAll(literalCol) == false)
-            return null;
         if (digest.groupbyColumns.containsAll(literalCol) == false)
             return null;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
index 5dbee21..ff2af55 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
@@ -148,6 +148,23 @@
         "returntype": "hllc(12)"
       },
       "dependent_measure_ref": null
+    }, {
+      "name" : "TOP_SELLER",
+      "function" : {
+        "expression" : "TOP_N",
+        "parameter" : {
+          "type" : "column",
+          "value" : "PRICE",
+          "next_parameter" : {
+            "type" : "column",
+            "value" : "SELLER_ID",
+            "next_parameter" : null
+          }
+        },
+        "returntype" : "topn(100)",
+        "configuration": {"topn.encoding.SELLER_ID" : "int:4"}
+      },
+      "dependent_measure_ref" : null
     }
   ],
   "rowkey": {
@@ -221,6 +238,13 @@
             ]
           }
         ]
+      },
+      {
+        "name" : "F3",
+        "columns" : [ {
+          "qualifier" : "M",
+          "measure_refs" : [ "TOP_SELLER" ]
+        } ]
       }
     ]
   },


[4/4] kylin git commit: minor, add cap for reducer number in RedistributeFlatHiveTableStep

Posted by sh...@apache.org.
minor, add cap for reducer number in RedistributeFlatHiveTableStep

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/111e7927
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/111e7927
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/111e7927

Branch: refs/heads/master
Commit: 111e79275ecf4393d427a17f7f6654d74e2ec1a9
Parents: b4329a6
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 18 18:41:02 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/source/hive/HiveMRInput.java     | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/111e7927/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index f3fceb1..f536cbb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -344,6 +344,7 @@ public class HiveMRInput implements IMRInput {
 
                 int numReducers = Math.round(rowCount / ((float) mapperInputRows));
                 numReducers = Math.max(1, numReducers);
+                numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
 
                 stepLogger.log("total input rows = " + rowCount);
                 stepLogger.log("expected input rows per mapper = " + mapperInputRows);