You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/20 06:28:01 UTC
[1/4] kylin git commit: KYLIN-1917 TopN counter merge performance
improvement
Repository: kylin
Updated Branches:
refs/heads/master b4329a66a -> 30a4162dd
KYLIN-1917 TopN counter merge performance improvement
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30a4162d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30a4162d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30a4162d
Branch: refs/heads/master
Commit: 30a4162dd7af4158c90b6965a5bcf14119fae2e1
Parents: 501c12a
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 14:27:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/measure/topn/TopNCounter.java | 16 +++++-----------
.../kylin/source/kafka/job/SeekOffsetStep.java | 15 +++++++--------
2 files changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 072fe90..cf9978a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -96,14 +96,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
*/
public void consolidate() {
Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
-
- if (this.size() > this.capacity) {
- for (int x = this.size() - 1; x >= capacity; x--) {
- Counter<T> removed = counterList.remove(x);
- this.counterMap.remove(removed.item);
- }
- }
-
+ retain(capacity);
ordered = true;
}
@@ -214,9 +207,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
assert newCapacity > 0;
this.capacity = newCapacity;
if (this.size() > newCapacity) {
- for (int x = newCapacity; x < this.size(); x++) {
- Counter<T> removed = counterList.remove(x);
- this.counterMap.remove(removed.item);
+ Counter<T> toRemoved;
+ for (int i = 0, n = this.size() - newCapacity; i < n; i++) {
+ toRemoved = counterList.pollLast();
+ this.counterMap.remove(toRemoved.item);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/30a4162d/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index a26f39d..98d6e4d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -84,16 +84,15 @@ public class SeekOffsetStep extends AbstractExecutable {
}
}
}
+ logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+ }
- if (partitionInfos.size() > startOffsets.size()) {
- // has new partition added
- for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
- startOffsets.put(partitionInfos.get(x).partition(), earliest);
- }
+ if (partitionInfos.size() > startOffsets.size()) {
+ // has new partition added
+ for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+ startOffsets.put(partitionInfos.get(x).partition(), earliest);
}
-
- logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
}
if (endOffsets.isEmpty()) {
[2/4] kylin git commit: KYLIN-1917 TopN counter merge performance
improvement
Posted by sh...@apache.org.
KYLIN-1917 TopN counter merge performance improvement
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0deabb65
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0deabb65
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0deabb65
Branch: refs/heads/master
Commit: 0deabb6504dd901b97585a7c25aea7643c80a6f7
Parents: 111e792
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 13:21:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/measure/topn/Counter.java | 17 +-
.../apache/kylin/measure/topn/TopNCounter.java | 234 +++++++------------
.../measure/topn/TopNCounterSerializer.java | 4 +-
.../topn/TopNCounterSerializerTest.java | 2 +-
.../measure/topn/TopNCounterBasicTest.java | 2 +-
5 files changed, 99 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 041ea2b..cd5b825 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -31,6 +31,8 @@ import java.io.ObjectOutput;
public class Counter<T> implements Externalizable {
protected T item;
+
+
protected double count;
// protected double error;
@@ -42,10 +44,15 @@ public class Counter<T> implements Externalizable {
public Counter(T item) {
this.count = 0;
- // this.error = 0;
this.item = item;
}
+ public Counter(T item, double count) {
+ this.item = item;
+ this.count = count;
+ }
+
+
public T getItem() {
return item;
}
@@ -54,13 +61,11 @@ public class Counter<T> implements Externalizable {
return count;
}
- // public double getError() {
- // return error;
- // }
-
+ public void setCount(double count) {
+ this.count = count;
+ }
@Override
public String toString() {
- // return item + ":" + count + ':' + error;
return item + ":" + count;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index ab4b40e..072fe90 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,20 +19,22 @@
package org.apache.kylin.measure.topn;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.util.Pair;
-
+import com.google.common.collect.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
- *
+ *
* Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
* data structure as described in:
* <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
@@ -45,30 +47,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public static final int EXTRA_SPACE_RATE = 50;
protected int capacity;
- private HashMap<T, ListNode2<Counter<T>>> counterMap;
- protected DoublyLinkedList<Counter<T>> counterList;
+ private HashMap<T, Counter<T>> counterMap;
+ protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element
+ private boolean ordered = true;
+ private boolean descending = true;
/**
* @param capacity maximum size (larger capacities improve accuracy)
*/
public TopNCounter(int capacity) {
this.capacity = capacity;
- counterMap = new HashMap<T, ListNode2<Counter<T>>>();
- counterList = new DoublyLinkedList<Counter<T>>();
+ counterMap = Maps.newHashMap();
+ counterList = Lists.newLinkedList();
}
public int getCapacity() {
return capacity;
}
- /**
- * Algorithm: <i>Space-Saving</i>
- *
- * @param item stream element (<i>e</i>)
- * @return false if item was already in the stream summary, true otherwise
- */
- public boolean offer(T item) {
- return offer(item, 1.0);
+ public LinkedList<Counter<T>> getCounterList() {
+ return counterList;
+ }
+
+ public void offer(T item) {
+ offer(item, 1.0);
}
/**
@@ -77,103 +79,42 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
* @param item stream element (<i>e</i>)
* @return false if item was already in the stream summary, true otherwise
*/
- public boolean offer(T item, double incrementCount) {
- return offerReturnAll(item, incrementCount).getFirst();
- }
-
- /**
- * @param item stream element (<i>e</i>)
- * @return item dropped from summary if an item was dropped, null otherwise
- */
- public T offerReturnDropped(T item, double incrementCount) {
- return offerReturnAll(item, incrementCount).getSecond();
- }
-
- /**
- * @param item stream element (<i>e</i>)
- * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
- */
- public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
- ListNode2<Counter<T>> counterNode = counterMap.get(item);
- boolean isNewItem = (counterNode == null);
- T droppedItem = null;
- if (isNewItem) {
-
- if (size() < capacity) {
- counterNode = counterList.enqueue(new Counter<T>(item));
- } else {
- counterNode = counterList.tail();
- Counter<T> counter = counterNode.getValue();
- droppedItem = counter.item;
- counterMap.remove(droppedItem);
- counter.item = item;
- counter.count = 0.0;
- }
+ public void offer(T item, double incrementCount) {
+ Counter<T> counterNode = counterMap.get(item);
+ if (counterNode == null) {
+ counterNode = new Counter<T>(item, incrementCount);
counterMap.put(item, counterNode);
- }
-
- incrementCounter(counterNode, incrementCount);
-
- return Pair.newPair(isNewItem, droppedItem);
- }
-
- protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
- Counter<T> counter = counterNode.getValue();
- counter.count += incrementCount;
-
- ListNode2<Counter<T>> nodeNext;
-
- if (incrementCount > 0) {
- nodeNext = counterNode.getNext();
- } else {
- nodeNext = counterNode.getPrev();
- }
- counterList.remove(counterNode);
- counterNode.prev = null;
- counterNode.next = null;
-
- if (incrementCount > 0) {
- while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
- nodeNext = nodeNext.getNext();
- }
- if (nodeNext != null) {
- counterList.addBefore(nodeNext, counterNode);
- } else {
- counterList.add(counterNode);
- }
-
+ counterList.add(counterNode);
} else {
- while (nodeNext != null && counter.count < nodeNext.getValue().count) {
- nodeNext = nodeNext.getPrev();
- }
- if (nodeNext != null) {
- counterList.addAfter(nodeNext, counterNode);
- } else {
- counterList.enqueue(counterNode);
- }
+ counterNode.setCount(counterNode.getCount() + incrementCount);
}
-
+ ordered = false;
}
- public List<T> peek(int k) {
- List<T> topK = new ArrayList<T>(k);
+ /**
+ * Resort and keep the expected size
+ */
+ public void consolidate() {
+ Collections.sort(counterList, this.descending ? DESC_Comparator : ASC_Comparator);
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
- if (topK.size() == k) {
- return topK;
+ if (this.size() > this.capacity) {
+ for (int x = this.size() - 1; x >= capacity; x--) {
+ Counter<T> removed = counterList.remove(x);
+ this.counterMap.remove(removed.item);
}
- topK.add(b.item);
}
- return topK;
+ ordered = true;
}
public List<Counter<T>> topK(int k) {
- List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
-
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
+ if (ordered == false) {
+ consolidate();
+ }
+ List<Counter<T>> topK = new ArrayList<>(k);
+ Iterator<Counter<T>> iterator = counterList.iterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
if (topK.size() == k) {
return topK;
}
@@ -194,8 +135,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
- for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
- Counter<T> b = bNode.getValue();
+ Iterator<Counter<T>> iterator = counterList.iterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
sb.append(b.item);
sb.append(':');
sb.append(b.count);
@@ -211,10 +153,9 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
* @param count
*/
public void offerToHead(T item, double count) {
- Counter<T> c = new Counter<T>(item);
- c.count = count;
- ListNode2<Counter<T>> node = counterList.add(c);
- counterMap.put(c.item, node);
+ Counter<T> c = new Counter<T>(item, count);
+ counterList.addFirst(c);
+ counterMap.put(c.item, c);
}
/**
@@ -225,19 +166,19 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public TopNCounter<T> merge(TopNCounter<T> another) {
double m1 = 0.0, m2 = 0.0;
if (this.size() >= this.capacity) {
- m1 = this.counterList.tail().getValue().count;
+ m1 = this.counterList.getLast().count;
}
if (another.size() >= another.capacity) {
- m2 = another.counterList.tail().getValue().count;
+ m2 = another.counterList.getLast().count;
}
Set<T> duplicateItems = Sets.newHashSet();
List<T> notDuplicateItems = Lists.newArrayList();
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+ for (Map.Entry<T, Counter<T>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
- ListNode2<Counter<T>> existing = another.counterMap.get(item);
+ Counter<T> existing = another.counterMap.get(item);
if (existing != null) {
duplicateItems.add(item);
} else {
@@ -246,21 +187,22 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
}
for (T item : duplicateItems) {
- this.offer(item, another.counterMap.get(item).getValue().count);
+ this.offer(item, another.counterMap.get(item).count);
}
for (T item : notDuplicateItems) {
this.offer(item, m2);
}
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
+ for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
if (duplicateItems.contains(item) == false) {
- double counter = entry.getValue().getValue().count;
+ double counter = entry.getValue().count;
this.offer(item, counter + m1);
}
}
+ this.consolidate();
return this;
}
@@ -271,13 +213,10 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
public void retain(int newCapacity) {
assert newCapacity > 0;
this.capacity = newCapacity;
- if (newCapacity < this.size()) {
- ListNode2<Counter<T>> tail = counterList.tail();
- while (tail != null && this.size() > newCapacity) {
- Counter<T> bucket = tail.getValue();
- this.counterMap.remove(bucket.getItem());
- this.counterList.remove(tail);
- tail = this.counterList.tail();
+ if (this.size() > newCapacity) {
+ for (int x = newCapacity; x < this.size(); x++) {
+ Counter<T> removed = counterList.remove(x);
+ this.counterMap.remove(removed.item);
}
}
@@ -291,10 +230,15 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
- Counter<T> b = bNode.getValue();
- counters[index] = b.count;
- index++;
+ if (this.descending == true) {
+ Iterator<Counter<T>> iterator = counterList.descendingIterator();
+ while (iterator.hasNext()) {
+ Counter<T> b = iterator.next();
+ counters[index] = b.count;
+ index++;
+ }
+ } else {
+ throw new IllegalStateException(); // support in future
}
assert index == size();
@@ -303,37 +247,27 @@ public class TopNCounter<T> implements Iterable<Counter<T>> {
@Override
public Iterator<Counter<T>> iterator() {
- return new TopNCounterIterator();
- }
-
- /**
- * Iterator from the tail (smallest) to head (biggest);
- */
- private class TopNCounterIterator implements Iterator<Counter<T>> {
-
- private ListNode2<Counter<T>> currentBNode;
-
- private TopNCounterIterator() {
- currentBNode = counterList.tail();
+ if (this.descending == true) {
+ return this.counterList.descendingIterator();
+ } else {
+ throw new IllegalStateException(); // support in future
}
+ }
+ private static final Comparator ASC_Comparator = new Comparator<Counter>() {
@Override
- public boolean hasNext() {
- return currentBNode != null;
-
+ public int compare(Counter o1, Counter o2) {
+ return o1.getCount() > o2.getCount() ? 1 : o1.getCount() == o2.getCount() ? 0 : -1;
}
- @Override
- public Counter<T> next() {
- Counter<T> counter = currentBNode.getValue();
- currentBNode = currentBNode.getNext();
- return counter;
- }
+ };
+ private static final Comparator DESC_Comparator = new Comparator<Counter>() {
@Override
- public void remove() {
- throw new UnsupportedOperationException();
+ public int compare(Counter o1, Counter o2) {
+ return o1.getCount() > o2.getCount() ? -1 : o1.getCount() == o2.getCount() ? 0 : 1;
}
- }
+
+ };
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 604365c..071e2a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -65,8 +65,8 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
@Override
public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
double[] counters = value.getCounters();
- List<ByteArray> peek = value.peek(1);
- int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+ List<Counter<ByteArray>> peek = value.topK(1);
+ int keyLength = peek.size() > 0 ? peek.get(0).getItem().length() : 0;
out.putInt(value.getCapacity());
out.putInt(value.size());
out.putInt(keyLength);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index 7e7fd31..dedb4f5 100644
--- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -55,7 +55,7 @@ public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
for (Integer i : stream) {
vs.offer(new ByteArray(Bytes.toBytes(i)));
}
-
+ vs.consolidate();
ByteBuffer out = ByteBuffer.allocate(1024);
serializer.serialize(vs, out);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0deabb65/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index cb92338..162ef01 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -44,7 +44,7 @@ public class TopNCounterBasicTest {
@Test
public void testTopK() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
+ TopNCounter<String> vs = new TopNCounter<>(3);
String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);
[3/4] kylin git commit: KYLIN-2112 Allow a column be a dimension as
well as "group by" column in TopN measure
Posted by sh...@apache.org.
KYLIN-2112 Allow a column be a dimension as well as "group by" column in TopN measure
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/501c12a3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/501c12a3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/501c12a3
Branch: refs/heads/master
Commit: 501c12a3002f2d116a328fdad229f4b16e5bde75
Parents: 0deabb6
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 20 13:31:39 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800
----------------------------------------------------------------------
.../model/validation/rule/FunctionRule.java | 46 +++++++++-----------
.../kylin/measure/topn/TopNMeasureType.java | 2 -
...test_kylin_cube_with_slr_left_join_desc.json | 24 ++++++++++
3 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 792f18d..bcc9010 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -62,10 +62,10 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
public void validate(CubeDesc cube, ValidateContext context) {
List<MeasureDesc> measures = cube.getMeasures();
- if (validateMeasureNamesDuplicated(measures, context)) {
- return;
- }
-
+ if (validateMeasureNamesDuplicated(measures, context)) {
+ return;
+ }
+
List<FunctionDesc> countFuncs = new ArrayList<FunctionDesc>();
Iterator<MeasureDesc> it = measures.iterator();
@@ -126,12 +126,6 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
groupByCol = groupByCol.getNextParameter();
}
- if (duplicatedCol.size() > 0) {
- context.addResult(ResultLevel.ERROR, "Couldn't use " + duplicatedCol.toString() + " in Top-N as it is already defined as dimension.");
- return;
-
- }
-
}
}
@@ -189,20 +183,20 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
}
}
-
- /**
- * @param measures
- */
- private boolean validateMeasureNamesDuplicated(List<MeasureDesc> measures, ValidateContext context) {
- Set<String> nameSet = new HashSet<>();
- for (MeasureDesc measure: measures){
- if (nameSet.contains(measure.getName())){
- context.addResult(ResultLevel.ERROR, "There is duplicated measure's name: " + measure.getName());
- return true;
- } else {
- nameSet.add(measure.getName());
- }
- }
- return false;
- }
+
+ /**
+ * @param measures
+ */
+ private boolean validateMeasureNamesDuplicated(List<MeasureDesc> measures, ValidateContext context) {
+ Set<String> nameSet = new HashSet<>();
+ for (MeasureDesc measure: measures){
+ if (nameSet.contains(measure.getName())){
+ context.addResult(ResultLevel.ERROR, "There is duplicated measure's name: " + measure.getName());
+ return true;
+ } else {
+ nameSet.add(measure.getName());
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 2f93b77..3173bc1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -239,8 +239,6 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
}
}
- if (unmatchedDimensions.containsAll(literalCol) == false)
- return null;
if (digest.groupbyColumns.containsAll(literalCol) == false)
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/501c12a3/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
index 5dbee21..ff2af55 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
@@ -148,6 +148,23 @@
"returntype": "hllc(12)"
},
"dependent_measure_ref": null
+ }, {
+ "name" : "TOP_SELLER",
+ "function" : {
+ "expression" : "TOP_N",
+ "parameter" : {
+ "type" : "column",
+ "value" : "PRICE",
+ "next_parameter" : {
+ "type" : "column",
+ "value" : "SELLER_ID",
+ "next_parameter" : null
+ }
+ },
+ "returntype" : "topn(100)",
+ "configuration": {"topn.encoding.SELLER_ID" : "int:4"}
+ },
+ "dependent_measure_ref" : null
}
],
"rowkey": {
@@ -221,6 +238,13 @@
]
}
]
+ },
+ {
+ "name" : "F3",
+ "columns" : [ {
+ "qualifier" : "M",
+ "measure_refs" : [ "TOP_SELLER" ]
+ } ]
}
]
},
[4/4] kylin git commit: minor,
add cap for reducer number in RedistributeFlatHiveTableStep
Posted by sh...@apache.org.
minor, add cap for reducer number in RedistributeFlatHiveTableStep
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/111e7927
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/111e7927
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/111e7927
Branch: refs/heads/master
Commit: 111e79275ecf4393d427a17f7f6654d74e2ec1a9
Parents: b4329a6
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 18 18:41:02 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 20 14:27:52 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/source/hive/HiveMRInput.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/111e7927/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index f3fceb1..f536cbb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -344,6 +344,7 @@ public class HiveMRInput implements IMRInput {
int numReducers = Math.round(rowCount / ((float) mapperInputRows));
numReducers = Math.max(1, numReducers);
+ numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
stepLogger.log("total input rows = " + rowCount);
stepLogger.log("expected input rows per mapper = " + mapperInputRows);