You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/17 08:10:45 UTC
[1/7] incubator-kylin git commit: Add merge and retain in TopNCounter,
and also added unit test
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-943 [created] 38201be29
Add merge and retain in TopNCounter, and also added unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5980d95b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5980d95b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5980d95b
Branch: refs/heads/KYLIN-943
Commit: 5980d95bdd09b189af5fa81e9907245d353223bd
Parents: 7c50176
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 22:22:05 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:20 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 50 ++-
.../kylin/common/topn/TopNCounterBasicTest.java | 174 +++++++++
.../common/topn/TopNCounterCombinationTest.java | 65 ++++
.../common/topn/TopNCounterComparisonTest.java | 281 ---------------
.../kylin/common/topn/TopNCounterTest.java | 350 +++++++++++++------
5 files changed, 519 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 0a45d0b..2f337c2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -48,6 +48,10 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
this.count = count;
this.counterList = new DoublyLinkedList<Counter<T>>();
}
+
+ public int size() {
+ return counterList.size();
+ }
}
protected int capacity;
@@ -297,33 +301,36 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
- public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
- TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
- secondCounter.fromBytes(another.toBytes());
+ /**
+ * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+ * @param another
+ * @return
+ */
+ public TopNCounter<T> merge(TopNCounter<T> another) {
double m1 = 0.0, m2 = 0.0;
if (this.size() >= this.capacity) {
m1 = this.bucketList.tail().getValue().count;
}
- if (secondCounter.size() >= secondCounter.capacity) {
- m2 = secondCounter.bucketList.tail().getValue().count;
+ if (another.size() >= another.capacity) {
+ m2 = another.bucketList.tail().getValue().count;
}
for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
- ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+ ListNode2<Counter<T>> existing = another.counterMap.get(item);
if (existing != null) {
- this.offer(item, secondCounter.counterMap.get(item).getValue().count);
- this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+ this.offer(item, another.counterMap.get(item).getValue().count);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
- secondCounter.counterMap.remove(item);
+ another.counterMap.remove(item);
} else {
this.offer(item, m2);
this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
}
}
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
double counter = entry.getValue().getValue().count;
double error = entry.getValue().getValue().error;
@@ -333,4 +340,27 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return this;
}
+
+ /**
+ * Retain the capacity to the given number; The extra counters will be cut off
+ * @param newCapacity
+ */
+ public void retain(int newCapacity) {
+ assert newCapacity > 0;
+ this.capacity = newCapacity;
+ if (newCapacity < this.size()) {
+ ListNode2<Bucket> tail = bucketList.tail;
+ while (tail != null && this.size() > newCapacity) {
+ Bucket bucket = tail.getValue();
+
+ for (Counter<T> counter : bucket.counterList) {
+ this.counterMap.remove(counter.getItem());
+ }
+ tail = tail.getNext();
+ }
+
+ tail.next = null;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
new file mode 100644
index 0000000..0d59314
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterBasicTest {
+
+ private static final int NUM_ITERATIONS = 100000;
+
+ @Test
+ public void testTopNCounter() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ for (String i : stream) {
+ vs.offer(i);
+ /*
+ for(String s : vs.poll(3))
+ System.out.print(s+" ");
+ */
+ System.out.println(vs);
+ }
+
+ List<Counter<String>> topk = vs.topK(6);
+
+ for(Counter<String> top : topk) {
+ System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+ }
+
+ }
+
+ @Test
+ public void testTopK() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrement() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i, 10);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrementOutOfOrder() {
+ TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+ TopNCounter<String> vs_single = new TopNCounter<String>(3);
+ String[] stream = {"A", "B", "C", "D", "A"};
+ Integer[] increments = {15, 20, 25, 30, 1};
+
+ for (int i = 0; i < stream.length; i++) {
+ vs_increment.offer(stream[i], increments[i]);
+ for (int k = 0; k < increments[i]; k++) {
+ vs_single.offer(stream[i]);
+ }
+ }
+ System.out.println("Insert with counts vs. single inserts:");
+ System.out.println(vs_increment);
+ System.out.println(vs_single);
+
+ List<Counter<String>> topK_increment = vs_increment.topK(3);
+ List<Counter<String>> topK_single = vs_single.topK(3);
+
+ for (int i = 0; i < topK_increment.size(); i++) {
+ assertEquals(topK_increment.get(i).getItem(),
+ topK_single.get(i).getItem());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(c);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Counter<String> clone = (Counter<String>) oi.readObject();
+ assertEquals(c.getCount(), clone.getCount(), 0.0001);
+ assertEquals(c.getError(), clone.getError(), 0.0001);
+ assertEquals(c.getItem(), clone.getItem());
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(vs);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+
+
+ @Test
+ public void testByteSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ testSerialization(vs);
+
+ // Empty
+ vs = new TopNCounter<String>(0);
+ testSerialization(vs);
+ }
+
+ private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+ byte[] bytes = vs.toBytes();
+ TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
new file mode 100644
index 0000000..afde8f7
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class TopNCounterCombinationTest extends TopNCounterTest {
+
+ @Parameterized.Parameters
+ public static Collection<Integer[]> configs() {
+ // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
+ return Arrays.asList(new Integer[][] {
+ /*
+ // with 20X space
+ { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
+
+ */
+ // with 50X space
+ { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+
+ /*
+ // with 100X space
+ { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
+ */
+ });
+ }
+
+ public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+ super();
+ this.TOP_K = topK;
+ this.KEY_SPACE = TOP_K * keySpaceRate;
+ this.SPACE_SAVING_ROOM = spaceSavingRate;
+ TOTAL_RECORDS = 1000000; // 1 million
+ this.PARALLEL = 50;
+ this.verbose = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
deleted file mode 100644
index fb82522..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright (C) 2011 Clearspring Technologies, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.topn;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.math3.distribution.ZipfDistribution;
-import org.apache.kylin.common.util.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class TopNCounterComparisonTest {
-
- private static final int TOP_K = 100;
-
- private static final int KEY_SPACE = 100 * TOP_K;
-
- private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
-
- private static final int SPACE_SAVING_ROOM = 100;
-
- @Before
- public void setup() {
- }
-
- @After
- public void tearDown() {
- }
-
- protected String prepareTestDate() throws IOException {
- String[] allKeys = new String[KEY_SPACE];
-
- for (int i = 0; i < KEY_SPACE; i++) {
- allKeys[i] = RandomStringUtils.randomAlphabetic(10);
- }
-
- System.out.println("Start to create test random data...");
- long startTime = System.currentTimeMillis();
- ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
- int keyIndex;
-
- File tempFile = File.createTempFile("ZipfDistribution", ".txt");
-
- if (tempFile.exists())
- FileUtils.forceDelete(tempFile);
- FileWriter fw = new FileWriter(tempFile);
- try {
- for (int i = 0; i < TOTAL_RECORDS; i++) {
- keyIndex = zipf.sample();
- fw.write(allKeys[keyIndex]);
- fw.write('\n');
- }
- } finally {
- if (fw != null)
- fw.close();
- }
-
- System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
- System.out.println("Test data in : " + tempFile.getAbsolutePath());
-
- return tempFile.getAbsolutePath();
- }
-
- //@Test
- public void testCorrectness() throws IOException {
- String dataFile = prepareTestDate();
- TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
- TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-
- for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
- feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
- }
-
- FileUtils.forceDelete(new File(dataFile));
-
- compareResult(spaceSavingCounter, accurateCounter);
- }
-
- private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
- List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
- System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
- List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
- System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
-
- int error = 0;
- for (int i = 0; i < topResult1.size(); i++) {
- System.out.println("Compare " + i);
-
- // if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
- if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
- && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
- System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
- } else {
- System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
- System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
- error++;
- }
- }
-
- Assert.assertEquals(0, error);
- }
-
- @Test
- public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
- String dataFile = prepareTestDate();
-
- int PARALLEL = 10;
- TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
-
- for (int i = 0; i < PARALLEL; i++) {
- parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
- }
-
- int slice = TOTAL_RECORDS / PARALLEL;
- int startPosition = 0;
- for (int i = 0; i < PARALLEL; i++) {
- feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
- startPosition += slice;
- }
-
- // merge counters
-
- // for (int i = 1; i < PARALLEL; i++) {
- // parallelCounters[0].vs.merge(parallelCounters[i].vs);
- // }
-
- TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
-
- TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
- feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
-
- compareResult(mergedCounters[0], accurateCounter);
- FileUtils.forceDelete(new File(dataFile));
-
- }
-
- private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
- List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
- if (consumers.length == 1)
- return consumers;
-
- for (int i = 0, n = consumers.length; i < n; i = i + 2) {
- if (i + 1 < n) {
- consumers[i].vs.merge(consumers[i + 1].vs);
- }
-
- list.add(consumers[i]);
- }
-
- return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
- }
-
- private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
- long startTime = System.currentTimeMillis();
- BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
-
- int lineNum = 0;
- String line = bufferedReader.readLine();
- while (line != null) {
- if (lineNum >= startLine && lineNum < endLine) {
- consumer.addElement(line, 1.0);
- }
- line = bufferedReader.readLine();
- lineNum++;
- }
-
- bufferedReader.close();
- System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
- }
-
- private static interface TestDataConsumer {
- public void addElement(String elementKey, double value);
-
- public List<Pair<String, Double>> getTopN(int k);
-
- public long getSpentTime();
- }
-
- private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
- private long timeSpent = 0;
- protected TopNCounter<String> vs;
-
- public SpaceSavingConsumer() {
- vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
-
- }
-
- public void addElement(String key, double value) {
- //System.out.println("Adding " + key + ":" + incrementCount);
- long startTime = System.currentTimeMillis();
- vs.offer(key, value);
- timeSpent += (System.currentTimeMillis() - startTime);
- }
-
- @Override
- public List<Pair<String, Double>> getTopN(int k) {
- long startTime = System.currentTimeMillis();
- List<Counter<String>> tops = vs.topK(k);
- List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
- for (Counter<String> counter : tops)
- allRecords.add(new Pair(counter.getItem(), counter.getCount()));
- timeSpent += (System.currentTimeMillis() - startTime);
- return allRecords;
- }
-
- @Override
- public long getSpentTime() {
- return timeSpent;
- }
- }
-
- private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-
- private long timeSpent = 0;
- private Map<String, Double> hashMap;
-
- public HashMapConsumer() {
- hashMap = Maps.newHashMap();
- }
-
- public void addElement(String key, double value) {
- long startTime = System.currentTimeMillis();
- if (hashMap.containsKey(key)) {
- hashMap.put(key, hashMap.get(key) + value);
- } else {
- hashMap.put(key, value);
- }
- timeSpent += (System.currentTimeMillis() - startTime);
- }
-
- @Override
- public List<Pair<String, Double>> getTopN(int k) {
- long startTime = System.currentTimeMillis();
- List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
- for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
- allRecords.add(new Pair(entry.getKey(), entry.getValue()));
- }
-
- Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
- @Override
- public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
- return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
- }
- });
- timeSpent += (System.currentTimeMillis() - startTime);
- return allRecords.subList(0, k);
- }
-
- @Override
- public long getSpentTime() {
- return timeSpent;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5980d95b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index 23620d1..e5350ba 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -16,159 +16,289 @@
package org.apache.kylin.common.topn;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
import org.junit.Test;
import java.io.*;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+public class TopNCounterTest {
+ protected static int TOP_K;
-public class TopNCounterTest {
+ protected static int KEY_SPACE;
- private static final int NUM_ITERATIONS = 100000;
+ protected static int TOTAL_RECORDS;
- @Test
- public void testTopNCounter() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
- for (String i : stream) {
- vs.offer(i);
- /*
- for(String s : vs.poll(3))
- System.out.print(s+" ");
- */
- System.out.println(vs);
- }
-
- List<Counter<String>> topk = vs.topK(6);
-
- for(Counter<String> top : topk) {
- System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
- }
-
+ protected static int SPACE_SAVING_ROOM;
+
+ protected static int PARALLEL = 10;
+
+ protected static boolean verbose = true;
+
+ public TopNCounterTest() {
+ TOP_K = 100;
+ KEY_SPACE = 100 * TOP_K;
+ TOTAL_RECORDS = 1000000; // 1 million
+ SPACE_SAVING_ROOM = 100;
}
- @Test
- public void testTopK() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
+ protected String prepareTestDate() throws IOException {
+ String[] allKeys = new String[KEY_SPACE];
+
+ for (int i = 0; i < KEY_SPACE; i++) {
+ allKeys[i] = RandomStringUtils.randomAlphabetic(10);
}
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+ outputMsg("Start to create test random data...");
+ long startTime = System.currentTimeMillis();
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ int keyIndex;
+
+ File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+ if (tempFile.exists())
+ FileUtils.forceDelete(tempFile);
+ FileWriter fw = new FileWriter(tempFile);
+ try {
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ keyIndex = zipf.sample();
+ fw.write(allKeys[keyIndex]);
+ fw.write('\n');
+ }
+ } finally {
+ if (fw != null)
+ fw.close();
}
+
+ outputMsg("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+ outputMsg("Test data in : " + tempFile.getAbsolutePath());
+
+ return tempFile.getAbsolutePath();
}
@Test
- public void testTopKWithIncrement() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i, 10);
+ public void testSingleSpaceSaving() throws IOException {
+ String dataFile = prepareTestDate();
+ TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+ TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+
+ for (TopNCounterTest.TestDataConsumer consumer : new TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+ feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
}
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+ FileUtils.forceDelete(new File(dataFile));
+
+ compareResult(spaceSavingCounter, accurateCounter);
+ }
+
+ private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, TopNCounterTest.TestDataConsumer secondConsumer) {
+ List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+ outputMsg("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+ List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+ outputMsg("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+ int error = 0;
+ for (int i = 0; i < topResult1.size(); i++) {
+ outputMsg("Compare " + i);
+
+ if (isClose(topResult1.get(i).getSecond().doubleValue(), realSequence.get(i).getSecond().doubleValue())) {
+ // if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ } else {
+ outputMsg("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ outputMsg("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+ error++;
+ }
}
+
+ org.junit.Assert.assertEquals(0, error);
+ }
+
+ private boolean isClose(double value1, double value2) {
+
+ if(Math.abs(value1 - value2) < 5.0)
+ return true;
+
+ return false;
}
@Test
- public void testTopKWithIncrementOutOfOrder() {
- TopNCounter<String> vs_increment = new TopNCounter<String>(3);
- TopNCounter<String> vs_single = new TopNCounter<String>(3);
- String[] stream = {"A", "B", "C", "D", "A"};
- Integer[] increments = {15, 20, 25, 30, 1};
-
- for (int i = 0; i < stream.length; i++) {
- vs_increment.offer(stream[i], increments[i]);
- for (int k = 0; k < increments[i]; k++) {
- vs_single.offer(stream[i]);
- }
+ public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+ String dataFile = prepareTestDate();
+
+ TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterTest.SpaceSavingConsumer[PARALLEL];
+
+ for (int i = 0; i < PARALLEL; i++) {
+ parallelCounters[i] = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
}
- System.out.println("Insert with counts vs. single inserts:");
- System.out.println(vs_increment);
- System.out.println(vs_single);
- List<Counter<String>> topK_increment = vs_increment.topK(3);
- List<Counter<String>> topK_single = vs_single.topK(3);
+ int slice = TOTAL_RECORDS / PARALLEL;
+ int startPosition = 0;
+ for (int i = 0; i < PARALLEL; i++) {
+ feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+ startPosition += slice;
+ }
+
+ TopNCounterTest.SpaceSavingConsumer[] mergedCounters = singleMerge(parallelCounters);
+
+ TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+ feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+ compareResult(mergedCounters[0], accurateCounter);
+ FileUtils.forceDelete(new File(dataFile));
+
+ }
+
+ private TopNCounterTest.SpaceSavingConsumer[] singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
- for (int i = 0; i < topK_increment.size(); i++) {
- assertEquals(topK_increment.get(i).getItem(),
- topK_single.get(i).getItem());
+ TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+
+ for (int i=0, n=consumers.length; i<n; i++) {
+ merged.vs.merge(consumers[i].vs);
}
+
+ merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements;
+ return new TopNCounterTest.SpaceSavingConsumer[] {merged};
+
}
- @SuppressWarnings("unchecked")
- @Test
- public void testCounterSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(c);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Counter<String> clone = (Counter<String>) oi.readObject();
- assertEquals(c.getCount(), clone.getCount(), 0.0001);
- assertEquals(c.getError(), clone.getError(), 0.0001);
- assertEquals(c.getItem(), clone.getItem());
+ private TopNCounterTest.SpaceSavingConsumer[] binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
+
+
+ for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+ if (i + 1 < n) {
+ consumers[i].vs.merge(consumers[i + 1].vs);
+ }
+
+ list.add(consumers[i]);
}
+
+ return binaryMerge(list.toArray(new TopNCounterTest.SpaceSavingConsumer[list.size()]));
}
+
+ private void feedDataToConsumer(String dataFile, TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+ long startTime = System.currentTimeMillis();
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
- @SuppressWarnings("unchecked")
- @Test
- public void testSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
+ int lineNum = 0;
+ String line = bufferedReader.readLine();
+ while (line != null) {
+ if (lineNum >= startLine && lineNum < endLine) {
+ consumer.addElement(line, 1.0);
+ }
+ line = bufferedReader.readLine();
+ lineNum++;
}
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(vs);
- oo.close();
+ bufferedReader.close();
+ outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+ }
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+ private void outputMsg(String msg) {
+ if (verbose)
+ System.out.println(msg);
+ }
- assertEquals(vs.toString(), clone.toString());
+ private static interface TestDataConsumer {
+ public void addElement(String elementKey, double value);
+
+ public List<Pair<String, Double>> getTopN(int k);
+
+ public long getSpentTime();
}
+ private class SpaceSavingConsumer implements TopNCounterTest.TestDataConsumer {
+ private long timeSpent = 0;
+ protected TopNCounter<String> vs;
+
+ public SpaceSavingConsumer(int space) {
+ vs = new TopNCounter<String>(space);
- @Test
- public void testByteSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
}
- testSerialization(vs);
+ public void addElement(String key, double value) {
+ //outputMsg("Adding " + key + ":" + incrementCount);
+ long startTime = System.currentTimeMillis();
+ vs.offer(key, value);
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Counter<String>> tops = vs.topK(k);
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
- // Empty
- vs = new TopNCounter<String>(0);
- testSerialization(vs);
+ for (Counter<String> counter : tops)
+ allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords;
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
}
- private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
- byte[] bytes = vs.toBytes();
- TopNCounter<String> clone = new TopNCounter<String>(bytes);
+ private class HashMapConsumer implements TopNCounterTest.TestDataConsumer {
+
+ private long timeSpent = 0;
+ private Map<String, Double> hashMap;
+
+ public HashMapConsumer() {
+ hashMap = Maps.newHashMap();
+ }
+
+ public void addElement(String key, double value) {
+ long startTime = System.currentTimeMillis();
+ if (hashMap.containsKey(key)) {
+ hashMap.put(key, hashMap.get(key) + value);
+ } else {
+ hashMap.put(key, value);
+ }
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+ allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+ }
- assertEquals(vs.toString(), clone.toString());
+ Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+ @Override
+ public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+ return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+ }
+ });
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords.subList(0, k);
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
}
+
}
[6/7] incubator-kylin git commit: KYLIN-943 Update test case
Posted by sh...@apache.org.
KYLIN-943 Update test case
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/af4883e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/af4883e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/af4883e0
Branch: refs/heads/KYLIN-943
Commit: af4883e0185f71a239c7b7deb7609214a23b99af
Parents: f3fe0e5
Author: shaofengshi <sh...@apache.org>
Authored: Sun Sep 6 17:01:53 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../common/topn/TopNCounterCombinationTest.java | 2 ++
.../serializer/TopNCounterSerializerTest.java | 24 ++++++++++++++++++--
.../kylin/storage/hybrid/HybridInstance.java | 2 +-
.../localmeta/project/default.json | 5 ++++
.../test_case_data/sandbox/kylin.properties | 10 ++++----
.../kylin/job/BuildCubeWithEngineTest.java | 18 ++++++++++++++-
6 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index cc0557e..3d809df 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -16,6 +16,7 @@
package org.apache.kylin.common.topn;
+import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -23,6 +24,7 @@ import java.util.Arrays;
import java.util.Collection;
@RunWith(Parameterized.class)
+@Ignore ("For collecting accuracy statistics, not for functional test")
public class TopNCounterCombinationTest extends TopNCounterTest {
@Parameterized.Parameters
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
index 050193b..3c88446 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -3,6 +3,7 @@ package org.apache.kylin.metadata.measure.serializer;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.DataType;
import org.junit.Assert;
import org.junit.Test;
@@ -18,14 +19,14 @@ public class TopNCounterSerializerTest {
@SuppressWarnings("unchecked")
@Test
- public void testCounterSerialization() {
+ public void testSerialization() {
TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
for (Integer i : stream) {
vs.offer(new ByteArray(Bytes.toBytes(i)));
}
- ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+ ByteBuffer out = ByteBuffer.allocate(1024);
serializer.serialize(vs, out);
byte[] copyBytes = new byte[out.position()];
@@ -37,5 +38,24 @@ public class TopNCounterSerializerTest {
Assert.assertEquals(vs.toString(), vsNew.toString());
}
+
+ @Test
+ public void testValueOf() {
+
+ TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10);
+ ByteArray key = new ByteArray(1);
+ ByteBuffer byteBuffer = key.asBuffer();
+ BytesUtil.writeVLong(20l, byteBuffer);
+ origin.offer(key, 1.0);
+
+ byteBuffer = ByteBuffer.allocate(1024);
+ byteBuffer.putInt(1);
+ byteBuffer.putInt(20);
+ byteBuffer.putDouble(1.0);
+ TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array());
+
+
+ Assert.assertEquals(origin.toString(), counter.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index e9f0975..6ad27d5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -277,7 +277,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
}
public IRealization getLatestRealization() {
- if (realizations.length > 0) {
+ if (getRealizations().length > 0) {
return realizations[realizations.length - 1];
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/examples/test_case_data/localmeta/project/default.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/default.json b/examples/test_case_data/localmeta/project/default.json
index e4118ca..71eb1fa 100644
--- a/examples/test_case_data/localmeta/project/default.json
+++ b/examples/test_case_data/localmeta/project/default.json
@@ -23,6 +23,11 @@
"realization": "test_kylin_cube_without_slr_left_join_empty"
},
{
+ "name": "test_kylin_cube_topn",
+ "type": "CUBE",
+ "realization": "test_kylin_cube_topn"
+ },
+ {
"name": "test_kylin_ii_left_join",
"type": "INVERTED_INDEX",
"realization": "test_kylin_ii_left_join"
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index beb9c44..5b05f87 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -16,16 +16,16 @@ kylin.job.mapreduce.default.reduce.input.mb=500
# If true, job engine will not assume that hadoop CLI reside on the same server as it self
# you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
-kylin.job.run.as.remote.cmd=false
+kylin.job.run.as.remote.cmd=true
# Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.hostname=
+kylin.job.remote.cli.hostname=sandbox
# Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.username=
+kylin.job.remote.cli.username=root
# Only necessary when kylin.job.run.as.remote.cmd=true
-kylin.job.remote.cli.password=
+kylin.job.remote.cli.password=hadoop
# Used by test cases to prepare synthetic data for sample cube
kylin.job.remote.cli.working.dir=/tmp/kylin
@@ -34,7 +34,7 @@ kylin.job.remote.cli.working.dir=/tmp/kylin
kylin.job.concurrent.max.limit=10
# Whether calculate cube in mem in each mapper;
-kylin.job.cubing.inMem=true
+kylin.job.cubing.inMem=false
#the percentage of the sampling, default 25%
kylin.job.cubing.inMem.sampling.percent=25
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af4883e0/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index d7eb3cf..7c6b028 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -125,7 +125,8 @@ public class BuildCubeWithEngineTest {
}
private void testInner() throws Exception {
- String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
+ String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
+// String[] testCase = new String[] { "testInnerJoinTopNCube" };
runTestAndAssertSucceed(testCase);
}
@@ -183,6 +184,21 @@ public class BuildCubeWithEngineTest {
}
}
+
+ @SuppressWarnings("unused")
+ // called by reflection
+ private List<String> testInnerJoinTopNCube() throws Exception {
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long date1 = 0;
+ long date2 = f.parse("2013-01-01").getTime();
+ long date3 = f.parse("2022-01-01").getTime();
+ List<String> result = Lists.newArrayList();
+ result.add(buildSegment("test_kylin_cube_topn", date1, date2));
+ result.add(buildSegment("test_kylin_cube_topn", date2, date3));
+ return result;
+ }
+
@SuppressWarnings("unused")
// called by reflection
private List<String> testInnerJoinCube2() throws Exception {
[2/7] incubator-kylin git commit: KYLIN-943 initial commit for
function and performance test
Posted by sh...@apache.org.
KYLIN-943 initial commit for function and performance test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7c501760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7c501760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7c501760
Branch: refs/heads/KYLIN-943
Commit: 7c501760050e0d9a9160442ecf87dca554ededfa
Parents: 4456bb1
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 10:05:27 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:20 2015 +0800
----------------------------------------------------------------------
core-common/pom.xml | 4 +
.../org/apache/kylin/common/topn/Counter.java | 85 +++++
.../kylin/common/topn/DoublyLinkedList.java | 188 +++++++++++
.../org/apache/kylin/common/topn/ITopK.java | 53 +++
.../org/apache/kylin/common/topn/ListNode2.java | 51 +++
.../apache/kylin/common/topn/TopNCounter.java | 336 +++++++++++++++++++
.../common/topn/TopNCounterComparisonTest.java | 281 ++++++++++++++++
.../kylin/common/topn/TopNCounterTest.java | 174 ++++++++++
pom.xml | 6 +
9 files changed, 1178 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 577db42..f9e715d 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -65,6 +65,10 @@
<artifactId>commons-email</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
new file mode 100644
index 0000000..0f7d8de
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Modified from Counter.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class Counter<T> implements Externalizable {
+
+ protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
+
+ protected T item;
+ protected double count;
+ protected double error;
+
+ /**
+ * For de-serialization
+ */
+ public Counter() {
+ }
+
+ public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
+ this.bucketNode = bucket;
+ this.count = 0;
+ this.error = 0;
+ this.item = item;
+ }
+
+ public T getItem() {
+ return item;
+ }
+
+ public double getCount() {
+ return count;
+ }
+
+ public double getError() {
+ return error;
+ }
+
+ @Override
+ public String toString() {
+ return item + ":" + count + ':' + error;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ item = (T) in.readObject();
+ count = in.readDouble();
+ error = in.readDouble();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(item);
+ out.writeDouble(count);
+ out.writeDouble(error);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
new file mode 100644
index 0000000..0942b84
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+
+/**
+ * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class DoublyLinkedList<T> implements Iterable<T> {
+
+ protected int size;
+ protected ListNode2<T> tail;
+ protected ListNode2<T> head;
+
+ /**
+ * Append to head of list
+ */
+ public ListNode2<T> add(T value) {
+ ListNode2<T> node = new ListNode2<T>(value);
+ if (size++ == 0) {
+ tail = node;
+ } else {
+ node.prev = head;
+ head.next = node;
+ }
+
+ head = node;
+
+ return node;
+ }
+
+ /**
+ * Prepend to tail of list
+ */
+ public ListNode2<T> enqueue(T value) {
+ ListNode2<T> node = new ListNode2<T>(value);
+ if (size++ == 0) {
+ head = node;
+ } else {
+ node.next = tail;
+ tail.prev = node;
+ }
+
+ tail = node;
+
+ return node;
+ }
+
+ public void add(ListNode2<T> node) {
+ node.prev = head;
+ node.next = null;
+
+ if (size++ == 0) {
+ tail = node;
+ } else {
+ head.next = node;
+ }
+
+ head = node;
+ }
+
+ public ListNode2<T> addAfter(ListNode2<T> node, T value) {
+ ListNode2<T> newNode = new ListNode2<T>(value);
+ addAfter(node, newNode);
+ return newNode;
+ }
+
+ public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
+ newNode.next = node.next;
+ newNode.prev = node;
+ node.next = newNode;
+ if (newNode.next == null) {
+ head = newNode;
+ } else {
+ newNode.next.prev = newNode;
+ }
+ size++;
+ }
+
+ public void remove(ListNode2<T> node) {
+ if (node == tail) {
+ tail = node.next;
+ } else {
+ node.prev.next = node.next;
+ }
+
+ if (node == head) {
+ head = node.prev;
+ } else {
+ node.next.prev = node.prev;
+ }
+ size--;
+ }
+
+ public int size() {
+ return size;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return new DoublyLinkedListIterator(this);
+ }
+
+ protected class DoublyLinkedListIterator implements Iterator<T> {
+
+ protected DoublyLinkedList<T> list;
+ protected ListNode2<T> itr;
+ protected int length;
+
+ public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
+ this.length = list.size;
+ this.list = list;
+ this.itr = list.tail;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr != null;
+ }
+
+ @Override
+ public T next() {
+ if (length != list.size) {
+ throw new ConcurrentModificationException();
+ }
+ T next = itr.value;
+ itr = itr.next;
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public T first() {
+ return tail == null ? null : tail.getValue();
+ }
+
+ public T last() {
+ return head == null ? null : head.getValue();
+ }
+
+ public ListNode2<T> head() {
+ return head;
+ }
+
+ public ListNode2<T> tail() {
+ return tail;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T[] toArray() {
+ T[] a = (T[]) new Object[size];
+ int i = 0;
+ for (T v : this) {
+ a[i++] = v;
+ }
+ return a;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
new file mode 100644
index 0000000..36603b7
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ITopK.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import java.util.List;
+
+/**
+ * Modified from https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public interface ITopK<T> {
+
+ /**
+ * offer a single element to the top.
+ *
+ * @param element - the element to add to the top
+ * @return false if the element was already in the top
+ */
+ boolean offer(T element);
+
+ /**
+ * offer a single element to the top and increment the count
+ * for that element by incrementCount.
+ *
+ * @param element - the element to add to the top
+ * @param incrementCount - the increment count for the given count
+ * @return false if the element was already in the top
+ */
+ boolean offer(T element, double incrementCount);
+
+ /**
+ * @param k
+ * @return top k elements offered (may be an approximation)
+ */
+ List<T> peek(int k);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
new file mode 100644
index 0000000..92f5f57
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/ListNode2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+/**
+ * Modified from ListNode2.java in https://github.com/addthis/stream-lib
+ *
+ * @param <T>
+ */
+public class ListNode2<T> {
+
+ protected T value;
+ protected ListNode2<T> prev;
+ protected ListNode2<T> next;
+
+ public ListNode2(T value) {
+ this.value = value;
+ }
+
+ public ListNode2<T> getPrev() {
+ return prev;
+ }
+
+ public ListNode2<T> getNext() {
+ return next;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
new file mode 100644
index 0000000..0a45d0b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
+ *
+ * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
+ * data structure as described in:
+ * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
+ * by Metwally, Agrawal, and Abbadi
+ *
+ * @param <T> type of data in the stream to be summarized
+ */
+public class TopNCounter<T> implements ITopK<T>, Externalizable {
+
+ protected class Bucket {
+
+ protected DoublyLinkedList<Counter<T>> counterList;
+
+ private double count;
+
+ public Bucket(double count) {
+ this.count = count;
+ this.counterList = new DoublyLinkedList<Counter<T>>();
+ }
+ }
+
+ protected int capacity;
+ private HashMap<T, ListNode2<Counter<T>>> counterMap;
+ protected DoublyLinkedList<Bucket> bucketList;
+
+ /**
+ * @param capacity maximum size (larger capacities improve accuracy)
+ */
+ public TopNCounter(int capacity) {
+ this.capacity = capacity;
+ counterMap = new HashMap<T, ListNode2<Counter<T>>>();
+ bucketList = new DoublyLinkedList<Bucket>();
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Algorithm: <i>Space-Saving</i>
+ *
+ * @param item stream element (<i>e</i>)
+ * @return false if item was already in the stream summary, true otherwise
+ */
+ @Override
+ public boolean offer(T item) {
+ return offer(item, 1.0);
+ }
+
+ /**
+ * Algorithm: <i>Space-Saving</i>
+ *
+ * @param item stream element (<i>e</i>)
+ * @return false if item was already in the stream summary, true otherwise
+ */
+ @Override
+ public boolean offer(T item, double incrementCount) {
+ return offerReturnAll(item, incrementCount).getFirst();
+ }
+
+ /**
+ * @param item stream element (<i>e</i>)
+ * @return item dropped from summary if an item was dropped, null otherwise
+ */
+ public T offerReturnDropped(T item, int incrementCount) {
+ return offerReturnAll(item, incrementCount).getSecond();
+ }
+
+ /**
+ * @param item stream element (<i>e</i>)
+ * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped
+ */
+ public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
+ ListNode2<Counter<T>> counterNode = counterMap.get(item);
+ boolean isNewItem = (counterNode == null);
+ T droppedItem = null;
+ if (isNewItem) {
+
+ if (size() < capacity) {
+ counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+ } else {
+ Bucket min = bucketList.first();
+ counterNode = min.counterList.tail();
+ Counter<T> counter = counterNode.getValue();
+ droppedItem = counter.item;
+ counterMap.remove(droppedItem);
+ counter.item = item;
+ counter.error = min.count;
+ }
+ counterMap.put(item, counterNode);
+ }
+
+ incrementCounter(counterNode, incrementCount);
+
+ return new Pair<Boolean, T>(isNewItem, droppedItem);
+ }
+
+ protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
+ Counter<T> counter = counterNode.getValue(); // count_i
+ ListNode2<Bucket> oldNode = counter.bucketNode;
+ Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
+ bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
+ counter.count = counter.count + incrementCount;
+
+ // Finding the right bucket for count_i
+ // Because we allow a single call to increment count more than once, this may not be the adjacent bucket.
+ ListNode2<Bucket> bucketNodePrev = oldNode;
+ ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
+ while (bucketNodeNext != null) {
+ Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
+ if (counter.count == bucketNext.count) {
+ bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
+ break;
+ } else if (counter.count > bucketNext.count) {
+ bucketNodePrev = bucketNodeNext;
+ bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
+ } else {
+ // A new bucket has to be created
+ bucketNodeNext = null;
+ }
+ }
+
+ if (bucketNodeNext == null) {
+ Bucket bucketNext = new Bucket(counter.count);
+ bucketNext.counterList.add(counterNode);
+ bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+ }
+ counter.bucketNode = bucketNodeNext;
+
+ //Cleaning up
+ if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
+ {
+ bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
+ }
+ }
+
+ @Override
+ public List<T> peek(int k) {
+ List<T> topK = new ArrayList<T>(k);
+
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ if (topK.size() == k) {
+ return topK;
+ }
+ topK.add(c.item);
+ }
+ }
+
+ return topK;
+ }
+
+ public List<Counter<T>> topK(int k) {
+ List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
+
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ if (topK.size() == k) {
+ return topK;
+ }
+ topK.add(c);
+ }
+ }
+
+ return topK;
+ }
+
+ /**
+ * @return number of items stored
+ */
+ public int size() {
+ return counterMap.size();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ sb.append('{');
+ sb.append(b.count);
+ sb.append(":[");
+ for (Counter<T> c : b.counterList) {
+ sb.append('{');
+ sb.append(c.item);
+ sb.append(':');
+ sb.append(c.error);
+ sb.append("},");
+ }
+ if (b.counterList.size() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ sb.append("]},");
+ }
+ if (bucketList.size() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ this.bucketList = new DoublyLinkedList<Bucket>();
+ this.capacity = in.readInt();
+
+ int size = in.readInt();
+ this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+ Bucket currentBucket = null;
+ ListNode2<Bucket> currentBucketNode = null;
+ for (int i = 0; i < size; i++) {
+ Counter<T> c = (Counter<T>) in.readObject();
+ if (currentBucket == null || c.count != currentBucket.count) {
+ currentBucket = new Bucket(c.count);
+ currentBucketNode = bucketList.add(currentBucket);
+ }
+ c.bucketNode = currentBucketNode;
+ counterMap.put(c.item, currentBucket.counterList.add(c));
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(this.capacity);
+ out.writeInt(this.size());
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ out.writeObject(c);
+ }
+ }
+ }
+
+ /**
+ * For de-serialization
+ */
+ public TopNCounter() {
+ }
+
+ /**
+ * For de-serialization
+ *
+ * @param bytes
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ */
+ public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
+ fromBytes(bytes);
+ }
+
+ public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
+ readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
+ }
+
+ public byte[] toBytes() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ this.writeExternal(out);
+ out.flush();
+ return baos.toByteArray();
+
+ }
+
+ public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
+ TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
+ secondCounter.fromBytes(another.toBytes());
+ double m1 = 0.0, m2 = 0.0;
+ if (this.size() >= this.capacity) {
+ m1 = this.bucketList.tail().getValue().count;
+ }
+
+ if (secondCounter.size() >= secondCounter.capacity) {
+ m2 = secondCounter.bucketList.tail().getValue().count;
+ }
+
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
+ T item = entry.getKey();
+ ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+ if (existing != null) {
+ this.offer(item, secondCounter.counterMap.get(item).getValue().count);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+
+ secondCounter.counterMap.remove(item);
+ } else {
+ this.offer(item, m2);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
+ }
+ }
+
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+ T item = entry.getKey();
+ double counter = entry.getValue().getValue().count;
+ double error = entry.getValue().getValue().error;
+ this.offer(item, counter + m1);
+ this.counterMap.get(item).getValue().error = error + m1;
+ }
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
new file mode 100644
index 0000000..fb82522
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class TopNCounterComparisonTest {
+
+ private static final int TOP_K = 100;
+
+ private static final int KEY_SPACE = 100 * TOP_K;
+
+ private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
+
+ private static final int SPACE_SAVING_ROOM = 100;
+
+ @Before
+ public void setup() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ protected String prepareTestDate() throws IOException {
+ String[] allKeys = new String[KEY_SPACE];
+
+ for (int i = 0; i < KEY_SPACE; i++) {
+ allKeys[i] = RandomStringUtils.randomAlphabetic(10);
+ }
+
+ System.out.println("Start to create test random data...");
+ long startTime = System.currentTimeMillis();
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ int keyIndex;
+
+ File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+ if (tempFile.exists())
+ FileUtils.forceDelete(tempFile);
+ FileWriter fw = new FileWriter(tempFile);
+ try {
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ keyIndex = zipf.sample();
+ fw.write(allKeys[keyIndex]);
+ fw.write('\n');
+ }
+ } finally {
+ if (fw != null)
+ fw.close();
+ }
+
+ System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+ System.out.println("Test data in : " + tempFile.getAbsolutePath());
+
+ return tempFile.getAbsolutePath();
+ }
+
+ //@Test
+ public void testCorrectness() throws IOException {
+ String dataFile = prepareTestDate();
+ TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
+ TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+
+ for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+ feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
+ }
+
+ FileUtils.forceDelete(new File(dataFile));
+
+ compareResult(spaceSavingCounter, accurateCounter);
+ }
+
+ private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
+ List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+ System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+ List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+ System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+ int error = 0;
+ for (int i = 0; i < topResult1.size(); i++) {
+ System.out.println("Compare " + i);
+
+ // if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
+ && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ } else {
+ System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+ error++;
+ }
+ }
+
+ Assert.assertEquals(0, error);
+ }
+
+ @Test
+ public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+ String dataFile = prepareTestDate();
+
+ int PARALLEL = 10;
+ TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
+
+ for (int i = 0; i < PARALLEL; i++) {
+ parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
+ }
+
+ int slice = TOTAL_RECORDS / PARALLEL;
+ int startPosition = 0;
+ for (int i = 0; i < PARALLEL; i++) {
+ feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+ startPosition += slice;
+ }
+
+ // merge counters
+
+ // for (int i = 1; i < PARALLEL; i++) {
+ // parallelCounters[0].vs.merge(parallelCounters[i].vs);
+ // }
+
+ TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
+
+ TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
+ feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+ compareResult(mergedCounters[0], accurateCounter);
+ FileUtils.forceDelete(new File(dataFile));
+
+ }
+
+ private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
+
+ for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+ if (i + 1 < n) {
+ consumers[i].vs.merge(consumers[i + 1].vs);
+ }
+
+ list.add(consumers[i]);
+ }
+
+ return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
+ }
+
+ private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+ long startTime = System.currentTimeMillis();
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
+
+ int lineNum = 0;
+ String line = bufferedReader.readLine();
+ while (line != null) {
+ if (lineNum >= startLine && lineNum < endLine) {
+ consumer.addElement(line, 1.0);
+ }
+ line = bufferedReader.readLine();
+ lineNum++;
+ }
+
+ bufferedReader.close();
+ System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+ }
+
+ private static interface TestDataConsumer {
+ public void addElement(String elementKey, double value);
+
+ public List<Pair<String, Double>> getTopN(int k);
+
+ public long getSpentTime();
+ }
+
+ private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+ private long timeSpent = 0;
+ protected TopNCounter<String> vs;
+
+ public SpaceSavingConsumer() {
+ vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
+
+ }
+
+ public void addElement(String key, double value) {
+ //System.out.println("Adding " + key + ":" + incrementCount);
+ long startTime = System.currentTimeMillis();
+ vs.offer(key, value);
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Counter<String>> tops = vs.topK(k);
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Counter<String> counter : tops)
+ allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords;
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
+ }
+
+ private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
+
+ private long timeSpent = 0;
+ private Map<String, Double> hashMap;
+
+ public HashMapConsumer() {
+ hashMap = Maps.newHashMap();
+ }
+
+ public void addElement(String key, double value) {
+ long startTime = System.currentTimeMillis();
+ if (hashMap.containsKey(key)) {
+ hashMap.put(key, hashMap.get(key) + value);
+ } else {
+ hashMap.put(key, value);
+ }
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+ allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+ }
+
+ Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+ @Override
+ public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+ return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+ }
+ });
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords.subList(0, k);
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
new file mode 100644
index 0000000..23620d1
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterTest {
+
+ private static final int NUM_ITERATIONS = 100000;
+
+ @Test
+ public void testTopNCounter() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ for (String i : stream) {
+ vs.offer(i);
+ /*
+ for(String s : vs.poll(3))
+ System.out.print(s+" ");
+ */
+ System.out.println(vs);
+ }
+
+ List<Counter<String>> topk = vs.topK(6);
+
+ for(Counter<String> top : topk) {
+ System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+ }
+
+ }
+
+ @Test
+ public void testTopK() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrement() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i, 10);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrementOutOfOrder() {
+ TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+ TopNCounter<String> vs_single = new TopNCounter<String>(3);
+ String[] stream = {"A", "B", "C", "D", "A"};
+ Integer[] increments = {15, 20, 25, 30, 1};
+
+ for (int i = 0; i < stream.length; i++) {
+ vs_increment.offer(stream[i], increments[i]);
+ for (int k = 0; k < increments[i]; k++) {
+ vs_single.offer(stream[i]);
+ }
+ }
+ System.out.println("Insert with counts vs. single inserts:");
+ System.out.println(vs_increment);
+ System.out.println(vs_single);
+
+ List<Counter<String>> topK_increment = vs_increment.topK(3);
+ List<Counter<String>> topK_single = vs_single.topK(3);
+
+ for (int i = 0; i < topK_increment.size(); i++) {
+ assertEquals(topK_increment.get(i).getItem(),
+ topK_single.get(i).getItem());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(c);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Counter<String> clone = (Counter<String>) oi.readObject();
+ assertEquals(c.getCount(), clone.getCount(), 0.0001);
+ assertEquals(c.getError(), clone.getError(), 0.0001);
+ assertEquals(c.getItem(), clone.getItem());
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(vs);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+
+
+ @Test
+ public void testByteSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ testSerialization(vs);
+
+ // Empty
+ vs = new TopNCounter<String>(0);
+ testSerialization(vs);
+ }
+
+ private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+ byte[] bytes = vs.toBytes();
+ TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7c501760/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 360c6b1..051df4d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<commons-httpclient.version>3.1</commons-httpclient.version>
<commons-collections4.version>4.0</commons-collections4.version>
<commons-email.version>1.1</commons-email.version>
+ <commons-math3.version>3.6-SNAPSHOT</commons-math3.version>
<!-- Spark -->
<spark.version>1.3.0</spark.version>
@@ -386,6 +387,11 @@
<version>${commons-email.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math3.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
[3/7] incubator-kylin git commit: KYLIN-943 Update TopNCounter
serialization to ascending order
Posted by sh...@apache.org.
KYLIN-943 Update TopNCounter serialization to ascending order
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f3fe0e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f3fe0e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f3fe0e50
Branch: refs/heads/KYLIN-943
Commit: f3fe0e509fdf6a87baeb917b7588f390064afa6f
Parents: 6208520
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 21:02:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 78 +++----------
.../kylin/common/topn/TopNCounterBasicTest.java | 110 +++----------------
.../serializer/TopNCounterSerializerTest.java | 41 +++++++
3 files changed, 71 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 51fe1b2..69e8d56 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -38,7 +38,7 @@ import java.util.Map;
*
* @param <T> type of data in the stream to be summarized
*/
-public class TopNCounter<T> implements ITopK<T>, Externalizable {
+public class TopNCounter<T> implements ITopK<T> {
public static final int EXTRA_SPACE_RATE = 50;
@@ -101,7 +101,7 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
* @param item stream element (<i>e</i>)
* @return item dropped from summary if an item was dropped, null otherwise
*/
- public T offerReturnDropped(T item, int incrementCount) {
+ public T offerReturnDropped(T item, double incrementCount) {
return offerReturnAll(item, incrementCount).getSecond();
}
@@ -241,28 +241,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return sb.toString();
}
- @SuppressWarnings("unchecked")
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- this.bucketList = new DoublyLinkedList<Bucket>();
- this.capacity = in.readInt();
-
- int size = in.readInt();
- this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
- Bucket currentBucket = null;
- ListNode2<Bucket> currentBucketNode = null;
- for (int i = 0; i < size; i++) {
- Counter<T> c = (Counter<T>) in.readObject();
- if (currentBucket == null || c.count != currentBucket.count) {
- currentBucket = new Bucket(c.count);
- currentBucketNode = bucketList.add(currentBucket);
- }
- c.bucketNode = currentBucketNode;
- counterMap.put(c.item, currentBucket.counterList.add(c));
- }
- }
-
public void fromExternal(int size, double[] counters, List<T> items) {
this.bucketList = new DoublyLinkedList<Bucket>();
@@ -283,18 +261,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(this.capacity);
- out.writeInt(this.size());
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- out.writeObject(c);
- }
- }
- }
-
/**
* For de-serialization
*/
@@ -302,30 +268,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
/**
- * For de-serialization
- *
- * @param bytes
- * @throws java.io.IOException
- * @throws ClassNotFoundException
- */
- public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
- fromBytes(bytes);
- }
-
- public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
- readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
- }
-
- public byte[] toBytes() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(baos);
- this.writeExternal(out);
- out.flush();
- return baos.toByteArray();
-
- }
-
- /**
* Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
* @param another
* @return
@@ -387,12 +329,16 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
-
+
+ /**
+ * Get the counter values in ascending order
+ * @return
+ */
public double[] getCounters() {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
counters[index] = c.count;
@@ -402,10 +348,14 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return counters;
}
-
+
+ /**
+ * Get the item list order by counter values in ascending order
+ * @return
+ */
public List<T> getItems() {
List<T> items = Lists.newArrayList();
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
items.add(c.item);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index e25f651..252e955 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,45 +16,36 @@
package org.apache.kylin.common.topn;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.junit.Test;
-import java.io.*;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
public class TopNCounterBasicTest {
@Test
public void testTopNCounter() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y" };
for (String i : stream) {
- vs.offer(i);
- /*
- for(String s : vs.poll(3))
- System.out.print(s+" ");
- */
- System.out.println(vs);
+ vs.offer(i);
}
List<Counter<String>> topk = vs.topK(6);
-
- for(Counter<String> top : topk) {
+
+ for (Counter<String> top : topk) {
System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
}
-
+
}
@Test
public void testTopK() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);
}
@@ -67,7 +58,7 @@ public class TopNCounterBasicTest {
@Test
public void testTopKWithIncrement() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i, 10);
}
@@ -81,8 +72,8 @@ public class TopNCounterBasicTest {
public void testTopKWithIncrementOutOfOrder() {
TopNCounter<String> vs_increment = new TopNCounter<String>(3);
TopNCounter<String> vs_single = new TopNCounter<String>(3);
- String[] stream = {"A", "B", "C", "D", "A"};
- Integer[] increments = {15, 20, 25, 30, 1};
+ String[] stream = { "A", "B", "C", "D", "A" };
+ Integer[] increments = { 15, 20, 25, 30, 1 };
for (int i = 0; i < stream.length; i++) {
vs_increment.offer(stream[i], increments[i]);
@@ -98,102 +89,33 @@ public class TopNCounterBasicTest {
List<Counter<String>> topK_single = vs_single.topK(3);
for (int i = 0; i < topK_increment.size(); i++) {
- assertEquals(topK_increment.get(i).getItem(),
- topK_single.get(i).getItem());
+ assertEquals(topK_increment.get(i).getItem(), topK_single.get(i).getItem());
}
}
- @SuppressWarnings("unchecked")
- @Test
- public void testCounterSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(c);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Counter<String> clone = (Counter<String>) oi.readObject();
- assertEquals(c.getCount(), clone.getCount(), 0.0001);
- assertEquals(c.getError(), clone.getError(), 0.0001);
- assertEquals(c.getItem(), clone.getItem());
- }
- }
-
-
- @SuppressWarnings("unchecked")
- @Test
- public void testSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(vs);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
-
- assertEquals(vs.toString(), clone.toString());
- }
-
-
- @Test
- public void testByteSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
-
- testSerialization(vs);
-
- // Empty
- vs = new TopNCounter<String>(0);
- testSerialization(vs);
- }
-
- private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
- byte[] bytes = vs.toBytes();
- TopNCounter<String> clone = new TopNCounter<String>(bytes);
-
- assertEquals(vs.toString(), clone.toString());
- }
-
@Test
public void testRetain() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);
}
-
+
vs.retain(5);
assertTrue(vs.size() <= 5);
assertTrue(vs.getCapacity() <= 5);
}
-
+
@Test
public void testMerge() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
for (String i : stream) {
vs.offer(i);
}
-
- String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+ String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" };
TopNCounter<String> vs2 = new TopNCounter<String>(10);
for (String i : stream2) {
vs2.offer(i);
@@ -204,6 +126,6 @@ public class TopNCounterBasicTest {
for (Counter<String> c : topK) {
assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..050193b
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class TopNCounterSerializerTest {
+
+ private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)"));
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() {
+ TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+ Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+ for (Integer i : stream) {
+ vs.offer(new ByteArray(Bytes.toBytes(i)));
+ }
+
+ ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+ serializer.serialize(vs, out);
+
+ byte[] copyBytes = new byte[out.position()];
+ System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+ ByteBuffer in = ByteBuffer.wrap(copyBytes);
+ TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+ Assert.assertEquals(vs.toString(), vsNew.toString());
+
+ }
+
+}
[5/7] incubator-kylin git commit: KYLIN-943 Update Aggregator,
DataType, FunctionDesc etc for TopN
Posted by sh...@apache.org.
KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/196a537f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/196a537f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/196a537f
Branch: refs/heads/KYLIN-943
Commit: 196a537f8cd0739f45971fc86e2d3c5bdad8accb
Parents: 5980d95
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 26 17:47:38 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/topn/Counter.java | 4 +-
.../kylin/common/topn/TopNCounterBasicTest.java | 39 ++++++++++++-
.../common/topn/TopNCounterCombinationTest.java | 38 ++++++------
.../kylin/common/topn/TopNCounterTest.java | 10 ++--
.../metadata/measure/MeasureAggregator.java | 2 +
.../kylin/metadata/measure/TopNAggregator.java | 60 +++++++++++++++++++
.../serializer/TopNCounterSerializer.java | 61 ++++++++++++++++++++
.../apache/kylin/metadata/model/DataType.java | 8 ++-
.../kylin/metadata/model/FunctionDesc.java | 9 ++-
.../kylin/metadata/model/ParameterDesc.java | 54 ++++++++---------
10 files changed, 226 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 0f7d8de..866d3d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -72,14 +72,14 @@ public class Counter<T> implements Externalizable {
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
item = (T) in.readObject();
count = in.readDouble();
- error = in.readDouble();
+ //error = in.readDouble();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(item);
out.writeDouble(count);
- out.writeDouble(error);
+ //out.writeDouble(error);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 0d59314..e25f651 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -30,8 +30,6 @@ import static org.junit.Assert.assertTrue;
public class TopNCounterBasicTest {
- private static final int NUM_ITERATIONS = 100000;
-
@Test
public void testTopNCounter() {
TopNCounter<String> vs = new TopNCounter<String>(3);
@@ -171,4 +169,41 @@ public class TopNCounterBasicTest {
assertEquals(vs.toString(), clone.toString());
}
+
+ @Test
+ public void testRetain() {
+ TopNCounter<String> vs = new TopNCounter<String>(10);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ vs.retain(5);
+ assertTrue(vs.size() <= 5);
+ assertTrue(vs.getCapacity() <= 5);
+ }
+
+ @Test
+ public void testMerge() {
+
+ TopNCounter<String> vs = new TopNCounter<String>(10);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+
+ String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+ TopNCounter<String> vs2 = new TopNCounter<String>(10);
+ for (String i : stream2) {
+ vs2.offer(i);
+ }
+ // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0
+ vs.merge(vs2);
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index afde8f7..cc0557e 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,37 +29,33 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
public static Collection<Integer[]> configs() {
// return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
return Arrays.asList(new Integer[][] {
- /*
// with 20X space
- { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
-
- */
+ { 10, 20 }, // top 10%
+ { 20, 20 }, // top 5%
+ { 100, 20 }, // top 1%
+ { 1000, 20 }, // top 0.1%
+
// with 50X space
- { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+ { 10, 50 }, // top 10%
+ { 20, 50 }, // top 5%
+ { 100, 50 }, // top 1%
+ { 1000, 50 }, // top 0.1%
- /*
// with 100X space
- { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
- */
+ { 10, 100 }, // top 10%
+ { 20, 100 }, // top 5%
+ { 100, 100 }, // top 1%
+ { 1000, 100 }, // top 0.1%
});
}
- public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+ public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) throws Exception {
super();
- this.TOP_K = topK;
+ this.TOP_K = 100;
this.KEY_SPACE = TOP_K * keySpaceRate;
this.SPACE_SAVING_ROOM = spaceSavingRate;
TOTAL_RECORDS = 1000000; // 1 million
- this.PARALLEL = 50;
- this.verbose = false;
+ this.PARALLEL = 10;
+ this.verbose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index e5350ba..edc9ce2 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -41,7 +41,7 @@ public class TopNCounterTest {
protected static int SPACE_SAVING_ROOM;
protected static int PARALLEL = 10;
-
+
protected static boolean verbose = true;
public TopNCounterTest() {
@@ -60,7 +60,7 @@ public class TopNCounterTest {
outputMsg("Start to create test random data...");
long startTime = System.currentTimeMillis();
- ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5);
int keyIndex;
File tempFile = File.createTempFile("ZipfDistribution", ".txt");
@@ -70,7 +70,7 @@ public class TopNCounterTest {
FileWriter fw = new FileWriter(tempFile);
try {
for (int i = 0; i < TOTAL_RECORDS; i++) {
- keyIndex = zipf.sample();
+ keyIndex = zipf.sample() -1;
fw.write(allKeys[keyIndex]);
fw.write('\n');
}
@@ -85,7 +85,7 @@ public class TopNCounterTest {
return tempFile.getAbsolutePath();
}
- @Test
+ //@Test
public void testSingleSpaceSaving() throws IOException {
String dataFile = prepareTestDate();
TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
@@ -163,7 +163,7 @@ public class TopNCounterTest {
if (consumers.length == 1)
return consumers;
- TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+ TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
for (int i=0, n=consumers.length; i<n; i++) {
merged.vs.merge(consumers[i].vs);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index 4153cbd..b533044 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -55,6 +55,8 @@ abstract public class MeasureAggregator<V> {
return new BigDecimalMinAggregator();
else if (isDouble(returnType))
return new DoubleMinAggregator();
+ } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+ return new TopNAggregator();
}
throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
new file mode 100644
index 0000000..9cea4cc
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+/**
+ *
+ */
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+ int capacity = 0;
+ TopNCounter<ByteArray> sum = null;
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(TopNCounter<ByteArray> value) {
+ if (sum == null) {
+ sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
+ capacity = value.getCapacity();
+ }
+
+ sum.merge(value);
+ }
+
+ @Override
+ public TopNCounter getState() {
+
+ sum.retain(capacity);
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ // 1024 + 60 returned by AggregationCacheMemSizeTest
+ return 8 * capacity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
new file mode 100644
index 0000000..1c6c442
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 0;
+ }
+
+ @Override
+ public int maxLength() {
+ return 0;
+ }
+
+ @Override
+ public TopNCounter<ByteArray> valueOf(byte[] value) {
+ return null;
+ }
+
+ @Override
+ public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+
+ }
+
+ @Override
+ public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 9a89499..74ae1d4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -35,7 +35,7 @@ public class DataType {
public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+ "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
- + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc" //
+ + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn" //
+ "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
+ "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
@@ -183,6 +183,8 @@ public class DataType {
return 8;
} else if (isHLLC()) {
return 1 << precision;
+ } else if (isTopN()) {
+ return 8 * precision * 50;
}
throw new IllegalStateException("The return type : " + name + " is not recognized;");
}
@@ -250,6 +252,10 @@ public class DataType {
public boolean isHLLC() {
return name.equals("hllc");
}
+
+ public boolean isTopN() {
+ return name.equals("topn");
+ }
public String getName() {
return name;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e3f4c48..b87d50c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -34,10 +34,11 @@ public class FunctionDesc {
public static final String FUNC_MAX = "MAX";
public static final String FUNC_COUNT = "COUNT";
public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+ public static final String FUNC_TOP_N = "TOP_N";
public static final String PARAMTER_TYPE_CONSTANT = "constant";
public static final String PARAMETER_TYPE_COLUMN = "column";
-
+
@JsonProperty("expression")
private String expression;
@JsonProperty("parameter")
@@ -92,6 +93,10 @@ public class FunctionDesc {
return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
}
+ public boolean isTopN() {
+ return FUNC_TOP_N.equalsIgnoreCase(expression);
+ }
+
public boolean isHolisticCountDistinct() {
if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) {
return true;
@@ -138,7 +143,7 @@ public class FunctionDesc {
}
public DataType getSQLType() {
- if (isCountDistinct())
+ if (isCountDistinct() || isTopN())
return DataType.ANY;
else if (isSum() || isMax() || isMin())
return parameter.getColRefs().get(0).getType();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/196a537f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 1565862..bdb4b91 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -39,6 +39,9 @@ public class ParameterDesc {
private String type;
@JsonProperty("value")
private String value;
+
+ @JsonProperty("counter")
+ private String counter;
private List<TblColRef> colRefs;
@@ -62,6 +65,14 @@ public class ParameterDesc {
this.value = value;
}
+ public String getCounter() {
+ return counter;
+ }
+
+ public void setCounter(String counter) {
+ this.counter = counter;
+ }
+
public List<TblColRef> getColRefs() {
return colRefs;
}
@@ -85,39 +96,30 @@ public class ParameterDesc {
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((type == null) ? 0 : type.hashCode());
- result = prime * result + ((value == null) ? 0 : value.hashCode());
- return result;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ParameterDesc that = (ParameterDesc) o;
+
+ if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+ if (type != null ? !type.equals(that.type) : that.type != null) return false;
+ if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+ return true;
}
@Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ParameterDesc other = (ParameterDesc) obj;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- if (value == null) {
- if (other.value != null)
- return false;
- } else if (!value.equals(other.value))
- return false;
- return true;
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
+ result = 31 * result + (counter != null ? counter.hashCode() : 0);
+ return result;
}
@Override
public String toString() {
- return "ParameterDesc [type=" + type + ", value=" + value + "]";
+ return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
}
}
[4/7] incubator-kylin git commit: KYLIN-943 update query/storage
engine to support TopN
Posted by sh...@apache.org.
KYLIN-943 update query/storage engine to support TopN
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/38201be2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/38201be2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/38201be2
Branch: refs/heads/KYLIN-943
Commit: 38201be298772dae4f2a72d2cc463dc7b857f090
Parents: af4883e
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 17 14:05:06 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 55 ++++++++++--
.../kylin/cube/CubeCapabilityChecker.java | 60 +++++++++++--
.../apache/kylin/cube/CubeDimensionDeriver.java | 10 ++-
.../org/apache/kylin/cube/model/CubeDesc.java | 6 +-
.../kylin/metadata/model/FunctionDesc.java | 14 ++-
.../kylin/metadata/realization/SQLDigest.java | 13 ++-
.../apache/kylin/storage/StorageContext.java | 15 +---
.../kylin/storage/hybrid/HybridInstance.java | 4 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 3 +
.../kylin/storage/cache/DynamicCacheTest.java | 4 +-
.../kylin/storage/cache/StaticCacheTest.java | 4 +-
.../cube_desc/test_kylin_cube_topn_desc.json | 2 +-
.../localmeta/data/DEFAULT.STREAMING_TABLE.csv | 0
.../test_kylin_inner_join_model_desc.json | 3 +-
.../test_kylin_left_join_model_desc.json | 3 +-
.../kylin/job/BuildCubeWithEngineTest.java | 6 +-
.../apache/kylin/query/relnode/OLAPContext.java | 18 +++-
.../kylin/query/relnode/OLAPLimitRel.java | 1 +
.../apache/kylin/query/relnode/OLAPSortRel.java | 11 +--
.../kylin/query/test/ITKylinQueryTest.java | 2 +-
query/src/test/resources/query/sql/query81.sql | 26 ++++++
query/src/test/resources/query/sql/query82.sql | 26 ++++++
.../cube/v1/CubeSegmentTopNTupleIterator.java | 86 ++++++++++++++++++
.../hbase/cube/v1/CubeSegmentTupleIterator.java | 35 ++++----
.../storage/hbase/cube/v1/CubeStorageQuery.java | 92 +++++++++++++++-----
.../hbase/cube/v1/CubeTupleConverter.java | 71 ++++++++++++++-
.../cube/v1/SerializedHBaseTupleIterator.java | 9 +-
.../storage/hbase/common/ITStorageTest.java | 4 +-
28 files changed, 496 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 69e8d56..6814b8d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -23,10 +23,7 @@ import org.apache.kylin.common.util.Pair;
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
@@ -38,10 +35,11 @@ import java.util.Map;
*
* @param <T> type of data in the stream to be summarized
*/
-public class TopNCounter<T> implements ITopK<T> {
+public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public static final int EXTRA_SPACE_RATE = 50;
+
protected class Bucket {
protected DoublyLinkedList<Counter<T>> counterList;
@@ -365,4 +363,51 @@ public class TopNCounter<T> implements ITopK<T> {
return items;
}
+
+ @Override
+ public Iterator<Counter<T>> iterator() {
+ return new TopNCounterIterator();
+ }
+
+ private class TopNCounterIterator implements Iterator {
+
+ private ListNode2<Bucket> currentBNode;
+ private Iterator<Counter<T>> currentCounterIterator;
+
+ private TopNCounterIterator() {
+ currentBNode = bucketList.head();
+ if (currentBNode != null && currentBNode.getValue() != null) {
+ currentCounterIterator = currentBNode.getValue().counterList.iterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentCounterIterator == null) {
+ return false;
+ }
+
+ if (currentCounterIterator.hasNext()) {
+ return true;
+ }
+
+ currentBNode = currentBNode.getPrev();
+
+ if (currentBNode == null)
+ return false;
+
+ currentCounterIterator = currentBNode.getValue().counterList.iterator();
+ return hasNext();
+ }
+
+ @Override
+ public Counter<T> next() {
+ return currentCounterIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 77c3298..628340e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,19 +18,20 @@
package org.apache.kylin.cube;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
*/
public class CubeCapabilityChecker {
@@ -39,10 +40,11 @@ public class CubeCapabilityChecker {
public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
// retrieve members from olapContext
- Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest.groupbyColumns, digest.filterColumns);
+ Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
Collection<FunctionDesc> functions = digest.aggregations;
Collection<TblColRef> metricsColumns = digest.metricColumns;
Collection<JoinDesc> joins = digest.joinDescs;
+ boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
// match dimensions & aggregations & joins
@@ -62,6 +64,13 @@ public class CubeCapabilityChecker {
}
}
+ // for topn, the group column can come from measure
+ if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
+ boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
+ matchDimensions = matchedTopN;
+ matchAggregation = matchedTopN;
+ }
+
if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
return false;
@@ -70,6 +79,47 @@ public class CubeCapabilityChecker {
return true;
}
+ private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
+
+ CubeDesc cubeDesc = cube.getDescriptor();
+ List<FunctionDesc> cubeFunctions = cubeDesc.listAllFunctions();
+ Collection<FunctionDesc> functions = digest.aggregations;
+ Collection<MeasureDesc> sortMeasures = digest.sortMeasures;
+ Collection<SQLDigest.OrderEnum> sortOrders = digest.sortOrders;
+
+ FunctionDesc onlyFunction = functions.iterator().next();
+ if (onlyFunction.isSum() == false) {
+ // topN only support SUM expression
+ return false;
+ }
+
+ Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (measure.getFunction().isTopN()) {
+ List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
+ TblColRef displayCol = cols.get(cols.size() - 1);
+ dimensionColumnsCopy.remove(displayCol);
+ if(isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
+ if (measure.getFunction().isCompatible(onlyFunction)) {
+ return true;
+ }
+ }
+ dimensionColumnsCopy.add(displayCol);
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isTopN())
+ return true;
+ }
+
+ return false;
+ }
+
private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
CubeDesc cubeDesc = cube.getDescriptor();
boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
index a746c99..138d01e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
@@ -21,7 +21,9 @@ package org.apache.kylin.cube;
import java.util.Collection;
import java.util.HashSet;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
/**
*
@@ -29,7 +31,13 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class CubeDimensionDeriver {
- public static Collection<TblColRef> getDimensionColumns(Collection<TblColRef> groupByColumns, Collection<TblColRef> filterColumns) {
+ public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+ Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+ Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+ Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
+ Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
+
Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
dimensionColumns.addAll(groupByColumns);
dimensionColumns.addAll(filterColumns);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 90b5474..9cbdfae 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+
private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -827,5 +828,8 @@ public class CubeDesc extends RootPersistentEntity {
}
return result;
}
-
+
+ public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
+ return measureDisplayColumns;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index b87d50c..d10f395 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -60,7 +60,7 @@ public class FunctionDesc {
}
public boolean needRewrite() {
- return !isSum() && !isDimensionAsMetric();
+ return !isSum() && !isDimensionAsMetric() && !isTopN();
}
public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -225,4 +225,16 @@ public class FunctionDesc {
return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
}
+ public boolean isCompatible(FunctionDesc another) {
+ if (another == null) {
+ return false;
+ }
+
+ if (this.isTopN() && another.isSum()) {
+ if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 7811858..e48cebe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,15 +19,22 @@
package org.apache.kylin.metadata.realization;
import java.util.Collection;
+import java.util.List;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
/**
*/
public class SQLDigest {
+
+ public enum OrderEnum {
+ ASCENDING, DESCENDING
+ }
+
public String factTable;
public TupleFilter filter;
public Collection<JoinDesc> joinDescs;
@@ -36,9 +43,11 @@ public class SQLDigest {
public Collection<TblColRef> filterColumns;
public Collection<TblColRef> metricColumns;
public Collection<FunctionDesc> aggregations;
+ public Collection<MeasureDesc> sortMeasures;
+ public Collection<OrderEnum> sortOrders;
public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
- Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc) {
+ Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
this.factTable = factTable;
this.filter = filter;
this.joinDescs = joinDescs;
@@ -47,6 +56,8 @@ public class SQLDigest {
this.filterColumns = filterColumns;
this.metricColumns = aggregatedColumns;
this.aggregations = aggregateFunnc;
+ this.sortMeasures = sortMeasures;
+ this.sortOrders = sortOrders;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 8b1b706..1643aa4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.realization.SQLDigest;
/**
* @author xjiang
@@ -32,17 +33,12 @@ public class StorageContext {
public static final int DEFAULT_THRESHOLD = 1000000;
- public enum OrderEnum {
- ASCENDING, DESCENDING
- }
private String connUrl;
private int threshold;
private int limit;
private int offset;
private boolean hasSort;
- private List<MeasureDesc> sortMeasures;
- private List<OrderEnum> sortOrders;
private boolean acceptPartialResult;
private boolean exactAggregation;
@@ -59,8 +55,6 @@ public class StorageContext {
this.totalScanCount = new AtomicLong();
this.cuboid = null;
this.hasSort = false;
- this.sortOrders = new ArrayList<OrderEnum>();
- this.sortMeasures = new ArrayList<MeasureDesc>();
this.exactAggregation = false;
this.enableLimit = false;
@@ -110,13 +104,6 @@ public class StorageContext {
return this.enableLimit;
}
- public void addSort(MeasureDesc measure, OrderEnum order) {
- if (measure != null) {
- sortMeasures.add(measure);
- sortOrders.add(order);
- }
- }
-
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 6ad27d5..0c30a3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -180,7 +180,9 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
@Override
public DataModelDesc getDataModelDesc() {
- return this.getLatestRealization().getDataModelDesc();
+ if (this.getLatestRealization() != null)
+ return this.getLatestRealization().getDataModelDesc();
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 0ffae69..11b03bd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -25,6 +25,7 @@ import java.util.List;
import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.metadata.measure.DoubleMutable;
import org.apache.kylin.metadata.measure.LongMutable;
@@ -65,6 +66,8 @@ public class Tuple implements ITuple {
ret.values[i] = null;
} else if (this.values[i] instanceof HyperLogLogPlusCounter) {
ret.values[i] = new HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]);
+ } else if (this.values[i] instanceof TopNCounter) {
+ ret.values[i] = null;
} else {
ret.values[i] = this.values[i];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 309d67f..161cad6 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.cache;
+import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -8,6 +9,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.IdentityUtils;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -76,7 +78,7 @@ public class DynamicCacheTest {
final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations);
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
ITuple aTuple = new TsOnlyTuple(partitionCol, "2011-02-01");
ITuple bTuple = new TsOnlyTuple(partitionCol, "2012-02-01");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index e54e3e0..48b0b1d 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.cache;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
@@ -8,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.IdentityUtils;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -34,7 +36,7 @@ public class StaticCacheTest {
final List<TblColRef> groups = StorageMockUtils.buildGroups();
final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
final TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
- final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+ final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
final List<ITuple> ret = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
index 2e0e376..96c3ace 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -111,7 +111,7 @@
]
},
"last_modified": 1422435345330,
- "model_name": "test_kylin_inner_join_model_desc",
+ "model_name": "test_kylin_left_join_model_desc",
"null_string": null,
"hbase_mapping": {
"column_family": [
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
index 86f8169..a28684f 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
@@ -36,7 +36,8 @@
"columns": [
"lstg_format_name",
"LSTG_SITE_ID",
- "SLR_SEGMENT_CD"
+ "SLR_SEGMENT_CD",
+ "seller_id"
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
index d05a08f..c26ffc5 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
@@ -47,7 +47,8 @@
"columns": [
"lstg_format_name",
"LSTG_SITE_ID",
- "SLR_SEGMENT_CD"
+ "SLR_SEGMENT_CD",
+ "seller_id"
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 7c6b028..4564ccc 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -120,13 +120,13 @@ public class BuildCubeWithEngineTest {
@Test
public void test() throws Exception {
DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
- testInner();
+// testInner();
testLeft();
}
private void testInner() throws Exception {
- String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
-// String[] testCase = new String[] { "testInnerJoinTopNCube" };
+ String[] testCase = new String[] { "testInnerJoinTopNCube" };
+ // String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
runTestAndAssertSucceed(testCase);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 378221c..6865457 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -26,11 +26,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
@@ -81,6 +83,8 @@ public class OLAPContext {
public OLAPContext(int seq) {
this.id = seq;
this.storageContext = new StorageContext();
+ this.sortMeasures = Lists.newArrayList();
+ this.sortOrders = Lists.newArrayList();
Map<String, String> parameters = _localPrarameters.get();
if (parameters != null) {
String acceptPartialResult = parameters.get(PRM_ACCEPT_PARTIAL_RESULT);
@@ -111,10 +115,14 @@ public class OLAPContext {
public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
public TupleFilter filter;
public List<JoinDesc> joins = new LinkedList<JoinDesc>();
+ private List<MeasureDesc> sortMeasures;
+ private List<SQLDigest.OrderEnum> sortOrders;
// rewrite info
public Map<String, RelDataType> rewriteFields = new HashMap<String, RelDataType>();
+ public int limit;
+
// hive query
public String sql = "";
@@ -126,7 +134,7 @@ public class OLAPContext {
public SQLDigest getSQLDigest() {
if (sqlDigest == null)
- sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations);
+ sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
return sqlDigest;
}
@@ -144,4 +152,12 @@ public class OLAPContext {
}
this.returnTupleInfo = info;
}
+
+ public void addSort(MeasureDesc measure, SQLDigest.OrderEnum order) {
+ if (measure != null) {
+ sortMeasures.add(measure);
+ sortOrders.add(order);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
index 60acd40..74d5de0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
@@ -78,6 +78,7 @@ public class OLAPLimitRel extends SingleRel implements OLAPRel {
Number limitValue = (Number) (((RexLiteral) localFetch).getValue());
int limit = limitValue.intValue();
this.context.storageContext.setLimit(limit);
+ this.context.limit = limit;
if(localOffset != null) {
Number offsetValue = (Number) (((RexLiteral) localOffset).getValue());
int offset = offsetValue.intValue();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
index fa5dc1d..b023dfd 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.storage.StorageContext;
import com.google.common.base.Preconditions;
@@ -82,12 +83,12 @@ public class OLAPSortRel extends Sort implements OLAPRel {
for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
int index = fieldCollation.getFieldIndex();
- StorageContext.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
+ SQLDigest.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
OLAPRel olapChild = (OLAPRel) this.getInput();
TblColRef orderCol = olapChild.getColumnRowType().getAllColumns().get(index);
MeasureDesc measure = findMeasure(orderCol);
if (measure != null) {
- this.context.storageContext.addSort(measure, order);
+ this.context.addSort(measure, order);
}
this.context.storageContext.markSort();
}
@@ -96,11 +97,11 @@ public class OLAPSortRel extends Sort implements OLAPRel {
this.columnRowType = buildColumnRowType();
}
- private StorageContext.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
+ private SQLDigest.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
if (direction == RelFieldCollation.Direction.DESCENDING) {
- return StorageContext.OrderEnum.DESCENDING;
+ return SQLDigest.OrderEnum.DESCENDING;
} else {
- return StorageContext.OrderEnum.ASCENDING;
+ return SQLDigest.OrderEnum.ASCENDING;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 72d7c4a..4821ce9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
@Test
public void testSingleRunQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql/query44.sql";
+ String queryFileName = "src/test/resources/query/sql/query82.sql";
File sqlFile = new File(queryFileName);
if (sqlFile.exists()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
new file mode 100644
index 0000000..78e30c5
--- /dev/null
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ seller_id
+ ,sum(test_kylin_fact.price) as GMV
+ FROM test_kylin_fact inner join edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ where test_kylin_fact.cal_dt < DATE '2013-02-01'
+ group by
+ test_kylin_fact.seller_id order by gmv desc limit 100
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..6b62753
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ test_kylin_fact.cal_dt, seller_id
+ ,sum(test_kylin_fact.price) as GMV
+ FROM test_kylin_fact
+left JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ group by
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by gmv desc limit 100
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
new file mode 100644
index 0000000..a8b1d02
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.apache.kylin.storage.translate.HBaseKeyRange;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
+
+ private Iterator<Tuple> innerResultIterator;
+
+ public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+ Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
+ List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
+ super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+
+ if (innerResultIterator == null) {
+ if (resultIterator == null) {
+ if (rangeIterator.hasNext() == false)
+ return false;
+
+ resultIterator = doScan(rangeIterator.next());
+ }
+
+ if (resultIterator.hasNext() == false) {
+ closeScanner();
+ resultIterator = null;
+ innerResultIterator = null;
+ return hasNext();
+ }
+
+ Result result = resultIterator.next();
+ scanCount++;
+ if (++scanCountDelta >= 1000)
+ flushScanCountDelta();
+ innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
+ }
+
+ if (innerResultIterator.hasNext()) {
+ next = innerResultIterator.next();
+ return true;
+ } else {
+ innerResultIterator = null;
+ return hasNext();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 0110fbe..9b2cf66 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -64,25 +64,26 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
public static final int SCAN_CACHE = 1024;
- private final CubeSegment cubeSeg;
+ protected final CubeSegment cubeSeg;
private final TupleFilter filter;
private final Collection<TblColRef> groupBy;
- private final Collection<RowValueDecoder> rowValueDecoders;
+ protected final List<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
private final HTableInterface table;
- private final CubeTupleConverter tupleConverter;
- private final Iterator<HBaseKeyRange> rangeIterator;
- private final Tuple oneTuple; // avoid new instance
+ protected CubeTupleConverter tupleConverter;
+ protected final Iterator<HBaseKeyRange> rangeIterator;
+ protected final Tuple oneTuple; // avoid new instance
private Scan scan;
private ResultScanner scanner;
- private Iterator<Result> resultIterator;
- private int scanCount;
- private int scanCountDelta;
- private Tuple next;
-
+ protected Iterator<Result> resultIterator;
+ protected int scanCount;
+ protected int scanCountDelta;
+ protected Tuple next;
+ protected final Cuboid cuboid;
+
public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
@@ -93,12 +94,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.context = context;
this.tableName = cubeSeg.getStorageLocationIdentifier();
- Cuboid cuboid = keyRanges.get(0).getCuboid();
+ cuboid = keyRanges.get(0).getCuboid();
for (HBaseKeyRange range : keyRanges) {
assert cuboid.equals(range.getCuboid());
}
- this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
+ this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
this.oneTuple = new Tuple(returnTupleInfo);
this.rangeIterator = keyRanges.iterator();
@@ -108,9 +109,10 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
throw new StorageException("Error when open connection to table " + tableName, t);
}
}
-
+
@Override
public boolean hasNext() {
+
if (next != null)
return true;
@@ -136,6 +138,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return true;
}
+
@Override
public Tuple next() {
if (next == null) {
@@ -153,7 +156,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
throw new UnsupportedOperationException();
}
- private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
+ protected final Iterator<Result> doScan(HBaseKeyRange keyRange) {
Iterator<Result> iter = null;
try {
scan = buildScan(keyRange);
@@ -247,7 +250,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return result;
}
- private void closeScanner() {
+ protected void closeScanner() {
flushScanCountDelta();
if (logger.isDebugEnabled() && scan != null) {
@@ -286,7 +289,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
closeTable();
}
- private void flushScanCountDelta() {
+ protected void flushScanCountDelta() {
context.increaseTotalScanCount(scanCountDelta);
scanCountDelta = 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 836f142..238fbed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,20 +18,10 @@
package org.apache.kylin.storage.hbase.cube.v1;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
@@ -68,10 +58,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
//v1
@SuppressWarnings("unused")
@@ -85,17 +72,32 @@ public class CubeStorageQuery implements ICachableStorageQuery {
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
private final String uuid;
+ private Collection<TblColRef> topNColumns;
public CubeStorageQuery(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
this.uuid = cube.getUuid();
+ this.topNColumns = Lists.newArrayList();
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isTopN()) {
+ List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+ topNColumns.add(colRefs.get(colRefs.size() - 1));
+ }
+ }
}
-
+
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ // check whether this is a TopN query;
+ checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNCol = extractTopNCol(groups);
+ if (topNCol != null)
+ groups.remove(topNCol);
+
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
@@ -148,7 +150,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
setLimit(filter, context);
HConnection conn = HBaseConnection.get(context.getConnUrl());
- return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+ return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
}
@Override
@@ -179,6 +181,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
if (sqlDigest.metricColumns.contains(column)) {
continue;
}
+
+ // skip topN display col
+ if (topNColumns.contains(column)) {
+ continue;
+ }
+
dimensions.add(column);
}
}
@@ -700,4 +708,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
+ private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNDisplayCol = extractTopNCol(groups);
+ boolean hasTopN = topNDisplayCol != null;
+
+ if (hasTopN == false)
+ return;
+
+ if (sqlDigest.aggregations.size() != 1) {
+ throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+ }
+
+ FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+ if (functionDesc.isSum() == false) {
+ throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+ }
+
+ FunctionDesc rewriteFunction = null;
+ // replace the SUM to the TopN function
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+ rewriteFunction = measureDesc.getFunction();
+ break;
+ }
+ }
+
+ if (rewriteFunction == null) {
+ throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+ }
+
+ sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+ logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+ }
+
+ private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+ for (TblColRef colRef : colRefs) {
+ if (topNColumns.contains(colRef)) {
+ return colRef;
+ }
+ }
+
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index e569cbd..8813901 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,21 +2,31 @@ package org.apache.kylin.storage.hbase.cube.v1;
import java.io.IOException;
import java.util.BitSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowKeyDecoder;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import org.apache.kylin.storage.tuple.Tuple;
import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,20 +40,24 @@ public class CubeTupleConverter {
final TupleInfo tupleInfo;
final RowKeyDecoder rowKeyDecoder;
final List<RowValueDecoder> rowValueDecoders;
- final List<IDerivedColumnFiller> derivedColFillers;
-
+ final List<IDerivedColumnFiller> derivedColFillers;
final int[] dimensionTupleIdx;
final int[][] metricsMeasureIdx;
final int[][] metricsTupleIdx;
+ final TblColRef topNCol;
+ int topNColTupleIdx;
+ int topNMeasureTupleIdx;
+ Dictionary<String> topNColDict;
- public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
+ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = tupleInfo;
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
this.rowValueDecoders = rowValueDecoders;
this.derivedColFillers = Lists.newArrayList();
-
+ this.topNCol = topNCol;
+
List<TblColRef> dimCols = cuboid.getColumns();
// pre-calculate dimension index mapping to tuple
@@ -52,6 +66,7 @@ public class CubeTupleConverter {
TblColRef col = dimCols.get(i);
dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
}
+
// pre-calculate metrics index mapping to tuple
metricsMeasureIdx = new int[rowValueDecoders.size()][];
@@ -64,6 +79,7 @@ public class CubeTupleConverter {
metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
FunctionDesc aggrFunc = measures[mi].getFunction();
+
int tupleIdx;
// a rewrite metrics is identified by its rewrite field name
if (aggrFunc.needRewrite()) {
@@ -80,6 +96,13 @@ public class CubeTupleConverter {
}
}
+ if (this.topNCol != null) {
+ this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+ this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
+
+ this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+ }
+
// prepare derived columns and filler
Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -93,6 +116,46 @@ public class CubeTupleConverter {
}
}
+ public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
+ translateResult(hbaseRow, tuple);
+ Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+ assert (topNCounterObj instanceof TopNCounter);
+ return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+ }
+
+ private class TopNCounterTupleIterator implements Iterator {
+
+ private Tuple tuple;
+ private Iterator<Counter> topNCounterIterator;
+ private Counter<ByteArray> counter;
+
+ private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+ this.tuple = tuple;
+ this.topNCounterIterator = topNCounter.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return topNCounterIterator.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ counter = topNCounterIterator.next();
+ int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+ String colValue = topNColDict.getValueFromId(key);
+ tuple.setDimensionValue(topNColTupleIdx, colValue);
+ tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+
+ return tuple;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
public void translateResult(Result hbaseRow, Tuple tuple) {
try {
byte[] rowkey = hbaseRow.getRow();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e433b78..831cadb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -58,7 +58,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITuple next;
public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
- Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
+ Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
StorageContext context, TupleInfo returnTupleInfo) {
this.context = context;
@@ -67,8 +67,13 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
+ boolean useTopN = topNCol != null;
for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
- CubeSegmentTupleIterator segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ CubeSegmentTupleIterator segIter;
+ if (useTopN)
+ segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+ else
+ segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
this.segmentIteratorList.add(segIter);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/38201be2/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
index 0b4fd07..df52664 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.common;
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -142,7 +144,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
int count = 0;
ITupleIterator iterator = null;
try {
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
while (iterator.hasNext()) {
ITuple tuple = iterator.next();
[7/7] incubator-kylin git commit: KYLIN-943 TopN in cube builder,
on the way
Posted by sh...@apache.org.
KYLIN-943 TopN in cube builder, on the way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6208520b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6208520b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6208520b
Branch: refs/heads/KYLIN-943
Commit: 6208520b412dd6a99440c496ab556dfa3aaa3e5a
Parents: 196a537
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 16:37:46 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 52 +++++++
.../java/org/apache/kylin/cube/CubeManager.java | 4 +-
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 18 +--
.../org/apache/kylin/cube/model/CubeDesc.java | 35 ++++-
.../cube/model/CubeJoinedFlatTableDesc.java | 23 ++-
.../kylin/metadata/measure/TopNAggregator.java | 1 -
.../measure/serializer/DataTypeSerializer.java | 4 +
.../serializer/TopNCounterSerializer.java | 64 +++++++-
.../kylin/metadata/model/ParameterDesc.java | 18 +--
.../engine/mr/steps/BaseCuboidMapperBase.java | 79 +++++++---
.../mr/steps/FactDistinctColumnsMapperBase.java | 38 +++--
.../mr/steps/FactDistinctColumnsReducer.java | 4 +-
.../mr/steps/FactDistinctHiveColumnsMapper.java | 14 +-
.../localmeta/cube/test_kylin_cube_topn.json | 10 ++
.../cube_desc/test_kylin_cube_topn_desc.json | 146 +++++++++++++++++++
15 files changed, 429 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 2f337c2..51fe1b2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -18,9 +18,11 @@
package org.apache.kylin.common.topn;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.Pair;
import java.io.*;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,6 +39,8 @@ import java.util.Map;
* @param <T> type of data in the stream to be summarized
*/
public class TopNCounter<T> implements ITopK<T>, Externalizable {
+
+ public static final int EXTRA_SPACE_RATE = 50;
protected class Bucket {
@@ -259,6 +263,26 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
+ public void fromExternal(int size, double[] counters, List<T> items) {
+ this.bucketList = new DoublyLinkedList<Bucket>();
+
+ this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
+
+ Bucket currentBucket = null;
+ ListNode2<Bucket> currentBucketNode = null;
+ for (int i = 0; i < size; i++) {
+ Counter<T> c = new Counter<T>();
+ c.count = counters[i];
+ c.item = items.get(i);
+ if (currentBucket == null || c.count != currentBucket.count) {
+ currentBucket = new Bucket(c.count);
+ currentBucketNode = bucketList.add(currentBucket);
+ }
+ c.bucketNode = currentBucketNode;
+ counterMap.put(c.item, currentBucket.counterList.add(c));
+ }
+ }
+
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(this.capacity);
@@ -363,4 +387,32 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
+
+ public double[] getCounters() {
+ double[] counters = new double[size()];
+ int index = 0;
+
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ counters[index] = c.count;
+ index ++;
+ }
+ }
+
+ return counters;
+ }
+
+ public List<T> getItems() {
+ List<T> items = Lists.newArrayList();
+ for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Bucket b = bNode.getValue();
+ for (Counter<T> c : b.counterList) {
+ items.add(c.item);
+ }
+ }
+
+ return items;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index c6e6b88..2232f01 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -163,11 +163,11 @@ public class CubeManager implements IRealizationProvider {
public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
- if (!cubeDesc.getRowkey().isUseDictionary(col))
+ if (!cubeDesc.getAllColumnsNeedDictionary().contains(col))
return null;
DictionaryManager dictMgr = getDictionaryManager();
- DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factTableValueProvider);
+ DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), "true", col, factTableValueProvider);
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 714b309..0e7a7ac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -18,8 +18,6 @@
package org.apache.kylin.cube.cli;
-import java.io.IOException;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -31,6 +29,8 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
public class DictionaryGeneratorCLI {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
@@ -45,15 +45,13 @@ public class DictionaryGeneratorCLI {
private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(config);
- for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- logger.info("Building dictionary for " + col);
- cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
- }
- }
+ // dictionary
+ for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionary()) {
+ logger.info("Building dictionary for " + col);
+ cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
+ }
+ for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
// build snapshot
if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
// CubeSegment seg = cube.getTheOnlySegment();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 0060e09..90b5474 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+ private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -628,18 +629,28 @@ public class CubeDesc extends RootPersistentEntity {
ParameterDesc p = f.getParameter();
p.normalizeColumnValue();
+ ArrayList<TblColRef> colRefs = Lists.newArrayList();
if (p.isColumnType()) {
- ArrayList<TblColRef> colRefs = Lists.newArrayList();
for (String cName : p.getValue().split("\\s*,\\s*")) {
ColumnDesc sourceColumn = factTable.findColumnByName(cName);
TblColRef colRef = new TblColRef(sourceColumn);
colRefs.add(colRef);
allColumns.add(colRef);
}
- if (colRefs.isEmpty() == false)
- p.setColRefs(colRefs);
}
+ // for topN
+ if (StringUtils.isNotEmpty(p.getDisplayColumn())) {
+ ColumnDesc sourceColumn = factTable.findColumnByName(p.getDisplayColumn());
+ TblColRef colRef = new TblColRef(sourceColumn);
+ colRefs.add(colRef);
+ measureDisplayColumns.add(colRef);
+ allColumns.add(colRef);
+ }
+
+ if (colRefs.isEmpty() == false)
+ p.setColRefs(colRefs);
+
// verify holistic count distinct as a dependent measure
if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
@@ -799,4 +810,22 @@ public class CubeDesc extends RootPersistentEntity {
this.engineType = engineType;
}
+
+ public List<TblColRef> getAllColumnsNeedDictionary() {
+ List<TblColRef> result = Lists.newArrayList();
+
+ for (RowKeyColDesc rowKeyColDesc : rowkey.getRowKeyColumns()) {
+ TblColRef colRef = rowKeyColDesc.getColRef();
+ if (rowkey.isUseDictionary(colRef)) {
+ result.add(colRef);
+ }
+ }
+
+ for (TblColRef colRef : measureDisplayColumns) {
+ if (!result.contains(colRef))
+ result.add(colRef);
+ }
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 4ac8848..adaf542 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -30,9 +30,12 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
+ private Map<String, Integer> columnIndexMap;
+
public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
+ this.columnIndexMap = Maps.newHashMap();
parseCubeDesc();
}
@@ -55,26 +58,25 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
}
- Map<String, Integer> dimensionIndexMap = Maps.newHashMap();
int columnIndex = 0;
for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
- dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex);
+ columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
columnIndex++;
}
- // build index
+ // build index for rowkey columns
List<TblColRef> cuboidColumns = baseCuboid.getColumns();
rowKeyColumnIndexes = new int[rowkeyColCount];
for (int i = 0; i < rowkeyColCount; i++) {
String colName = colName(cuboidColumns.get(i).getCanonicalName());
- Integer dimIdx = dimensionIndexMap.get(colName);
+ Integer dimIdx = columnIndexMap.get(colName);
if (dimIdx == null) {
throw new RuntimeException("Can't find column " + colName);
}
rowKeyColumnIndexes[i] = dimIdx;
}
-
+
List<MeasureDesc> measures = cubeDesc.getMeasures();
int measureSize = measures.size();
measureColumnIndexes = new int[measureSize][];
@@ -90,6 +92,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
measureColumnIndexes[i][j] = contains(columnList, c);
if (measureColumnIndexes[i][j] < 0) {
measureColumnIndexes[i][j] = columnIndex;
+ columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
columnIndex++;
}
@@ -154,4 +157,14 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private static String colName(String canonicalColName) {
return canonicalColName.replace(".", "_");
}
+
+ public int getColumnIndex(TblColRef colRef) {
+ String key = colName(colRef.getCanonicalName());
+ Integer index = columnIndexMap.get(key);
+ if (index == null)
+ throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+
+ return index.intValue();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index 9cea4cc..0de5fe8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -53,7 +53,6 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
@Override
public int getMemBytesEstimate() {
- // 1024 + 60 returned by AggregationCacheMemSizeTest
return 8 * capacity;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
index e68beda..4fadbb0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
@@ -63,6 +63,10 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
return new HLLCSerializer(type);
}
+ if (type.isTopN()) {
+ return new TopNCounterSerializer(type);
+ }
+
Class<?> clz = implementations.get(type.getName());
if (clz == null)
throw new RuntimeException("No MeasureSerializer for type " + type);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index 1c6c442..56ed85d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -18,44 +18,92 @@
package org.apache.kylin.metadata.measure.serializer;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.DataType;
-import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
/**
*
*/
public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+ private DoubleDeltaSerializer dds = new DoubleDeltaSerializer();
+
+ private int precision;
+
+ public TopNCounterSerializer(DataType dataType) {
+ this.precision = dataType.getPrecision();
+ }
@Override
public int peekLength(ByteBuffer in) {
- return 0;
+ int mark = in.position();
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ dds.deserialize(in);
+ int len = in.position() - mark + keyLength * size;
+ in.position(mark);
+ return len;
}
@Override
public int maxLength() {
- return 0;
+ return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
}
@Override
public TopNCounter<ByteArray> valueOf(byte[] value) {
- return null;
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ int sizeOfId = buffer.getInt();
+ int keyEncodedValue = buffer.getInt();
+ double counter = buffer.getDouble();
+
+ ByteArray key = new ByteArray(sizeOfId);
+ BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
+
+ TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
+ topNCounter.offer(key, counter);
+ return topNCounter;
}
@Override
public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+ double[] counters = value.getCounters();
+ List<ByteArray> items = value.getItems();
+ int keyLength = items.size() > 0 ? items.get(0).length() : 0;
+ out.putInt(value.getCapacity());
+ out.putInt(value.size());
+ out.putInt(keyLength);
+ dds.serialize(counters, out);
+ for (ByteArray item : items) {
+ out.put(item.array());
+ }
}
@Override
public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
- return null;
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ double[] counters = dds.deserialize(in);
+ List<ByteArray> items = Lists.newArrayList();
+
+ for(int i=0; i<size; i++) {
+ ByteArray byteArray = new ByteArray(keyLength);
+ in.get(byteArray.array());
+ items.add(byteArray);
+ }
+
+ TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+ counter.fromExternal(size, counters, items);
+ return counter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index bdb4b91..9773b84 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -40,8 +40,8 @@ public class ParameterDesc {
@JsonProperty("value")
private String value;
- @JsonProperty("counter")
- private String counter;
+ @JsonProperty("displaycolumn")
+ private String displayColumn;
private List<TblColRef> colRefs;
@@ -65,12 +65,12 @@ public class ParameterDesc {
this.value = value;
}
- public String getCounter() {
- return counter;
+ public String getDisplayColumn() {
+ return displayColumn;
}
- public void setCounter(String counter) {
- this.counter = counter;
+ public void setDisplayColumn(String displayColumn) {
+ this.displayColumn = displayColumn;
}
public List<TblColRef> getColRefs() {
@@ -102,7 +102,7 @@ public class ParameterDesc {
ParameterDesc that = (ParameterDesc) o;
- if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+ if (displayColumn != null ? !displayColumn.equals(that.displayColumn) : that.displayColumn != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
@@ -113,13 +113,13 @@ public class ParameterDesc {
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
- result = 31 * result + (counter != null ? counter.hashCode() : 0);
+ result = 31 * result + (displayColumn != null ? displayColumn.hashCode() : 0);
return result;
}
@Override
public String toString() {
- return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
+ return "ParameterDesc [type=" + type + ", value=" + value + ", displayColumn=" + displayColumn + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index c2d0f92..ed1fd4a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -4,7 +4,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -18,14 +20,12 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +57,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
private Text outputKey = new Text();
private Text outputValue = new Text();
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
@Override
protected void setup(Context context) throws IOException {
@@ -91,8 +92,24 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
keyBytesBuf = new byte[colCount][];
+ initTopNDisplayColDictionaryMap();
initNullBytes();
}
+
+ private void initTopNDisplayColDictionaryMap() {
+ topNDisplayColDictMap = Maps.newHashMap();
+ for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
+ MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
+ FunctionDesc func = measureDesc.getFunction();
+ if (func.isTopN()) {
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+ int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
+ TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+ Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(displayCol);
+ topNDisplayColDictMap.put(displayColIdx, dictionary);
+ }
+ }
+ }
private void initNullBytes() {
nullBytes = Lists.newArrayList();
@@ -147,27 +164,49 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
// constant
if (flatTableIdx == null) {
result = Bytes.toBytes(paramDesc.getValue());
+ }
+ // count and count distinct
+ else if (func.isCount() || func.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ result = ONE;
}
- // column values
+ // topN, need encode the key column
+ else if(func.isTopN()) {
+ // encode the key column with dict, and get the counter column;
+ int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
+ Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
+ int keyColEncoded = displayColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
+ valueBuf.clear();
+ valueBuf.putInt(displayColDict.getSizeOfId());
+ valueBuf.putInt(keyColEncoded);
+ if (flatTableIdx.length == 1) {
+ // only displayCol, use 1.0 as counter
+ valueBuf.putDouble(1.0);
+ } else {
+ // get the counter column value
+ valueBuf.putDouble(Double.valueOf(Bytes.toString(splitBuffers[flatTableIdx[0]].value)));
+ }
+
+ result = valueBuf.array();
+
+ }
+ // normal case, concat column values
else {
- // for multiple columns, their values are joined
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- if (result == null) {
- result = Arrays.copyOf(split.value, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split.value, 0, newResult, result.length, split.length);
- result = newResult;
+ // for multiple columns, their values are joined
+ for (int i = 0; i < flatTableIdx.length; i++) {
+ SplittedBytes split = splitBuffers[flatTableIdx[i]];
+ if (result == null) {
+ result = Arrays.copyOf(split.value, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split.value, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
}
}
- }
- if (func.isCount() || func.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- result = ONE;
- }
+
if (isNull(result)) {
result = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 098373a..b97c88a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -1,10 +1,5 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -14,7 +9,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.KylinMapper;
@@ -24,6 +19,11 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
/**
*/
public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
@@ -33,7 +33,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected CubeSegment cubeSeg;
protected CubeDesc cubeDesc;
protected long baseCuboidId;
- protected List<TblColRef> columns;
+ protected List<TblColRef> dictionaryColumns;
protected ArrayList<Integer> factDictCols;
protected IMRTableInputFormat flatTableInputFormat;
@@ -41,6 +41,9 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected Text outputValue = new Text();
protected int errorRecordCounter = 0;
+ protected CubeJoinedFlatTableDesc intermediateTableDesc;
+ protected int[] dictionaryColumnIndex;
+
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
@@ -52,23 +55,30 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
+ dictionaryColumns = cubeDesc.getAllColumnsNeedDictionary();
factDictCols = new ArrayList<Integer>();
- RowKeyDesc rowKey = cubeDesc.getRowkey();
DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- for (int i = 0; i < columns.size(); i++) {
- TblColRef col = columns.get(i);
- if (!rowKey.isUseDictionary(col))
- continue;
+ for (int i = 0; i < dictionaryColumns.size(); i++) {
+ TblColRef col = dictionaryColumns.get(i);
- String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+ String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), "true", col).getTable();
if (cubeDesc.getModel().isFactTable(scanTable)) {
factDictCols.add(i);
}
}
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ dictionaryColumnIndex = new int[factDictCols.size()];
+ for (int i = 0; i < factDictCols.size(); i++) {
+ Integer column = factDictCols.get(i);
+ TblColRef colRef = dictionaryColumns.get(column);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+ }
+
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index fcc12e4..85312ff 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -77,9 +77,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
cubeDesc = cube.getDescriptor();
- baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- columnList = baseCuboid.getColumns();
+ columnList = cubeDesc.getAllColumnsNeedDictionary();
collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 86b3605..e43d5d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -23,7 +23,9 @@ import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
@@ -36,14 +38,13 @@ import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+import org.apache.kylin.metadata.model.TblColRef;
/**
* @author yangli9
*/
public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
- private CubeJoinedFlatTableDesc intermediateTableDesc;
-
protected boolean collectStatistics = false;
protected CuboidScheduler cuboidScheduler = null;
protected int nRowKey;
@@ -58,7 +59,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
@Override
protected void setup(Context context) throws IOException {
super.setup(context);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+
+
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
@@ -111,9 +113,9 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
String[] row = flatTableInputFormat.parseMapperInput(record);
try {
- for (int i : factDictCols) {
- outputKey.set((long) i);
- String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+ for (int i = 0; i < factDictCols.size(); i++) {
+ outputKey.set((long) factDictCols.get(i));
+ String fieldValue = row[dictionaryColumnIndex[i]];
if (fieldValue == null)
continue;
byte[] bytes = Bytes.toBytes(fieldValue);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
new file mode 100644
index 0000000..903fc15
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
@@ -0,0 +1,10 @@
+{
+ "uuid" : "33354455-a33e-4b69-83dd-0bb8b1f8c53b",
+ "last_modified" : 0,
+ "name" : "test_kylin_cube_topn",
+ "owner" : null,
+ "version" : null,
+ "descriptor" : "test_kylin_cube_topn_desc",
+ "segments" : [ ],
+ "create_time" : null
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6208520b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
new file mode 100644
index 0000000..2e0e376
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -0,0 +1,146 @@
+{
+ "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92",
+ "name": "test_kylin_cube_topn_desc",
+ "description": null,
+ "dimensions": [
+ {
+ "id": 0,
+ "name": "CAL_DT",
+ "table": "EDW.TEST_CAL_DT",
+ "column": null,
+ "derived": [
+ "WEEK_BEG_DT"
+ ],
+ "hierarchy": false
+ }
+ ],
+ "measures": [
+ {
+ "id": 1,
+ "name": "GMV_SUM",
+ "function": {
+ "expression": "SUM",
+ "parameter": {
+ "type": "column",
+ "value": "PRICE"
+ },
+ "returntype": "decimal(19,4)"
+ },
+ "dependent_measure_ref": null
+ },
+ {
+ "id": 2,
+ "name": "GMV_MIN",
+ "function": {
+ "expression": "MIN",
+ "parameter": {
+ "type": "column",
+ "value": "PRICE"
+ },
+ "returntype": "decimal(19,4)"
+ },
+ "dependent_measure_ref": null
+ },
+ {
+ "id": 3,
+ "name": "GMV_MAX",
+ "function": {
+ "expression": "MAX",
+ "parameter": {
+ "type": "column",
+ "value": "PRICE"
+ },
+ "returntype": "decimal(19,4)"
+ },
+ "dependent_measure_ref": null
+ },
+ {
+ "id": 4,
+ "name": "TRANS_CNT",
+ "function": {
+ "expression": "COUNT",
+ "parameter": {
+ "type": "constant",
+ "value": "1"
+ },
+ "returntype": "bigint"
+ },
+ "dependent_measure_ref": null
+ },
+ {
+ "id": 5,
+ "name": "ITEM_COUNT_SUM",
+ "function": {
+ "expression": "SUM",
+ "parameter": {
+ "type": "column",
+ "value": "ITEM_COUNT"
+ },
+ "returntype": "bigint"
+ },
+ "dependent_measure_ref": null
+ },
+ {
+ "id": 6,
+ "name": "TOP_SELLER",
+ "function": {
+ "expression": "TOP_N",
+ "parameter": {
+ "type": "column",
+ "value": "PRICE",
+ "displaycolumn": "seller_id"
+ },
+ "returntype": "topn(100)"
+ },
+ "dependent_measure_ref": null
+ }
+ ],
+ "rowkey": {
+ "rowkey_columns": [
+ {
+ "column": "cal_dt",
+ "length": 0,
+ "dictionary": "true",
+ "mandatory": false
+ }
+ ],
+ "aggregation_groups": [
+ [
+ "cal_dt"
+ ]
+ ]
+ },
+ "last_modified": 1422435345330,
+ "model_name": "test_kylin_inner_join_model_desc",
+ "null_string": null,
+ "hbase_mapping": {
+ "column_family": [
+ {
+ "name": "f1",
+ "columns": [
+ {
+ "qualifier": "m",
+ "measure_refs": [
+ "gmv_sum",
+ "gmv_min",
+ "gmv_max",
+ "trans_cnt",
+ "item_count_sum"
+ ]
+ }
+ ]
+ }, {
+ "name": "f2",
+ "columns": [
+ {
+ "qualifier": "m",
+ "measure_refs": [
+ "top_seller"
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ "notify_list": null
+}
\ No newline at end of file