You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/29 03:03:55 UTC
[04/27] incubator-kylin git commit: Add merge and retain in
TopNCounter, and also added unit test
Add merge and retain in TopNCounter, and also added unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/90c5351b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/90c5351b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/90c5351b
Branch: refs/heads/2.x-staging
Commit: 90c5351b1f86b2a32cdd4ac8e9ce82e2d4f9a11b
Parents: efa7ecb
Author: shaofengshi <sh...@apache.org>
Authored: Mon Aug 24 22:22:05 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 29 08:56:33 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 50 ++-
.../kylin/common/topn/TopNCounterBasicTest.java | 174 +++++++++
.../common/topn/TopNCounterCombinationTest.java | 65 ++++
.../common/topn/TopNCounterComparisonTest.java | 281 ---------------
.../kylin/common/topn/TopNCounterTest.java | 350 +++++++++++++------
5 files changed, 519 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/90c5351b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 0a45d0b..2f337c2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -48,6 +48,10 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
this.count = count;
this.counterList = new DoublyLinkedList<Counter<T>>();
}
+
+ public int size() {
+ return counterList.size();
+ }
}
protected int capacity;
@@ -297,33 +301,36 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
- public TopNCounter<T> merge(TopNCounter<T> another) throws IOException, ClassNotFoundException {
- TopNCounter<T> secondCounter = new TopNCounter<T>(another.capacity);
- secondCounter.fromBytes(another.toBytes());
+ /**
+ * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+ * @param another
+ * @return
+ */
+ public TopNCounter<T> merge(TopNCounter<T> another) {
double m1 = 0.0, m2 = 0.0;
if (this.size() >= this.capacity) {
m1 = this.bucketList.tail().getValue().count;
}
- if (secondCounter.size() >= secondCounter.capacity) {
- m2 = secondCounter.bucketList.tail().getValue().count;
+ if (another.size() >= another.capacity) {
+ m2 = another.bucketList.tail().getValue().count;
}
for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
- ListNode2<Counter<T>> existing = secondCounter.counterMap.get(item);
+ ListNode2<Counter<T>> existing = another.counterMap.get(item);
if (existing != null) {
- this.offer(item, secondCounter.counterMap.get(item).getValue().count);
- this.counterMap.get(item).getValue().error = entry.getValue().getValue().error+ secondCounter.counterMap.get(item).getValue().error;
+ this.offer(item, another.counterMap.get(item).getValue().count);
+ this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
- secondCounter.counterMap.remove(item);
+ another.counterMap.remove(item);
} else {
this.offer(item, m2);
this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
}
}
- for (Map.Entry<T, ListNode2<Counter<T>>> entry : secondCounter.counterMap.entrySet()) {
+ for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
double counter = entry.getValue().getValue().count;
double error = entry.getValue().getValue().error;
@@ -333,4 +340,27 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return this;
}
+
+ /**
+ * Retain the capacity to the given number; The extra counters will be cut off
+ * @param newCapacity
+ */
+ public void retain(int newCapacity) {
+ assert newCapacity > 0;
+ this.capacity = newCapacity;
+ if (newCapacity < this.size()) {
+ ListNode2<Bucket> tail = bucketList.tail;
+ while (tail != null && this.size() > newCapacity) {
+ Bucket bucket = tail.getValue();
+
+ for (Counter<T> counter : bucket.counterList) {
+ this.counterMap.remove(counter.getItem());
+ }
+ tail = tail.getNext();
+ }
+
+ tail.next = null;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/90c5351b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
new file mode 100644
index 0000000..0d59314
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TopNCounterBasicTest {
+
+ private static final int NUM_ITERATIONS = 100000;
+
+ @Test
+ public void testTopNCounter() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ for (String i : stream) {
+ vs.offer(i);
+ /*
+ for(String s : vs.poll(3))
+ System.out.print(s+" ");
+ */
+ System.out.println(vs);
+ }
+
+ List<Counter<String>> topk = vs.topK(6);
+
+ for(Counter<String> top : topk) {
+ System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+ }
+
+ }
+
+ @Test
+ public void testTopK() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrement() {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i, 10);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+ }
+ }
+
+ @Test
+ public void testTopKWithIncrementOutOfOrder() {
+ TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+ TopNCounter<String> vs_single = new TopNCounter<String>(3);
+ String[] stream = {"A", "B", "C", "D", "A"};
+ Integer[] increments = {15, 20, 25, 30, 1};
+
+ for (int i = 0; i < stream.length; i++) {
+ vs_increment.offer(stream[i], increments[i]);
+ for (int k = 0; k < increments[i]; k++) {
+ vs_single.offer(stream[i]);
+ }
+ }
+ System.out.println("Insert with counts vs. single inserts:");
+ System.out.println(vs_increment);
+ System.out.println(vs_single);
+
+ List<Counter<String>> topK_increment = vs_increment.topK(3);
+ List<Counter<String>> topK_single = vs_single.topK(3);
+
+ for (int i = 0; i < topK_increment.size(); i++) {
+ assertEquals(topK_increment.get(i).getItem(),
+ topK_single.get(i).getItem());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(c);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Counter<String> clone = (Counter<String>) oi.readObject();
+ assertEquals(c.getCount(), clone.getCount(), 0.0001);
+ assertEquals(c.getError(), clone.getError(), 0.0001);
+ assertEquals(c.getItem(), clone.getItem());
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutput oo = new ObjectOutputStream(baos);
+ oo.writeObject(vs);
+ oo.close();
+
+ ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+
+
+ @Test
+ public void testByteSerialization() throws IOException, ClassNotFoundException {
+ TopNCounter<String> vs = new TopNCounter<String>(3);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ testSerialization(vs);
+
+ // Empty
+ vs = new TopNCounter<String>(0);
+ testSerialization(vs);
+ }
+
+ private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
+ byte[] bytes = vs.toBytes();
+ TopNCounter<String> clone = new TopNCounter<String>(bytes);
+
+ assertEquals(vs.toString(), clone.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/90c5351b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
new file mode 100644
index 0000000..afde8f7
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.topn;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class TopNCounterCombinationTest extends TopNCounterTest {
+
+ @Parameterized.Parameters
+ public static Collection<Integer[]> configs() {
+ // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
+ return Arrays.asList(new Integer[][] {
+ /*
+ // with 20X space
+ { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
+
+ */
+ // with 50X space
+ { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+
+ /*
+ // with 100X space
+ { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
+ { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
+ { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
+ { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
+ */
+ });
+ }
+
+ public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+ super();
+ this.TOP_K = topK;
+ this.KEY_SPACE = TOP_K * keySpaceRate;
+ this.SPACE_SAVING_ROOM = spaceSavingRate;
+ TOTAL_RECORDS = 1000000; // 1 million
+ this.PARALLEL = 50;
+ this.verbose = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/90c5351b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
deleted file mode 100644
index fb82522..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterComparisonTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright (C) 2011 Clearspring Technologies, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.topn;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.math3.distribution.ZipfDistribution;
-import org.apache.kylin.common.util.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class TopNCounterComparisonTest {
-
- private static final int TOP_K = 100;
-
- private static final int KEY_SPACE = 100 * TOP_K;
-
- private static final int TOTAL_RECORDS = 100 * KEY_SPACE;
-
- private static final int SPACE_SAVING_ROOM = 100;
-
- @Before
- public void setup() {
- }
-
- @After
- public void tearDown() {
- }
-
- protected String prepareTestDate() throws IOException {
- String[] allKeys = new String[KEY_SPACE];
-
- for (int i = 0; i < KEY_SPACE; i++) {
- allKeys[i] = RandomStringUtils.randomAlphabetic(10);
- }
-
- System.out.println("Start to create test random data...");
- long startTime = System.currentTimeMillis();
- ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
- int keyIndex;
-
- File tempFile = File.createTempFile("ZipfDistribution", ".txt");
-
- if (tempFile.exists())
- FileUtils.forceDelete(tempFile);
- FileWriter fw = new FileWriter(tempFile);
- try {
- for (int i = 0; i < TOTAL_RECORDS; i++) {
- keyIndex = zipf.sample();
- fw.write(allKeys[keyIndex]);
- fw.write('\n');
- }
- } finally {
- if (fw != null)
- fw.close();
- }
-
- System.out.println("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
- System.out.println("Test data in : " + tempFile.getAbsolutePath());
-
- return tempFile.getAbsolutePath();
- }
-
- //@Test
- public void testCorrectness() throws IOException {
- String dataFile = prepareTestDate();
- TopNCounterComparisonTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterComparisonTest.SpaceSavingConsumer();
- TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
-
- for (TopNCounterComparisonTest.TestDataConsumer consumer : new TopNCounterComparisonTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
- feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
- }
-
- FileUtils.forceDelete(new File(dataFile));
-
- compareResult(spaceSavingCounter, accurateCounter);
- }
-
- private void compareResult(TopNCounterComparisonTest.TestDataConsumer firstConsumer, TopNCounterComparisonTest.TestDataConsumer secondConsumer) {
- List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
- System.out.println("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
- List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
- System.out.println("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
-
- int error = 0;
- for (int i = 0; i < topResult1.size(); i++) {
- System.out.println("Compare " + i);
-
- // if (topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
- if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst())
- && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
- System.out.println("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
- } else {
- System.out.println("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
- System.out.println("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
- error++;
- }
- }
-
- Assert.assertEquals(0, error);
- }
-
- @Test
- public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
- String dataFile = prepareTestDate();
-
- int PARALLEL = 10;
- TopNCounterComparisonTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterComparisonTest.SpaceSavingConsumer[PARALLEL];
-
- for (int i = 0; i < PARALLEL; i++) {
- parallelCounters[i] = new TopNCounterComparisonTest.SpaceSavingConsumer();
- }
-
- int slice = TOTAL_RECORDS / PARALLEL;
- int startPosition = 0;
- for (int i = 0; i < PARALLEL; i++) {
- feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
- startPosition += slice;
- }
-
- // merge counters
-
- // for (int i = 1; i < PARALLEL; i++) {
- // parallelCounters[0].vs.merge(parallelCounters[i].vs);
- // }
-
- TopNCounterComparisonTest.SpaceSavingConsumer[] mergedCounters = mergeSpaceSavingConsumer(parallelCounters);
-
- TopNCounterComparisonTest.HashMapConsumer accurateCounter = new TopNCounterComparisonTest.HashMapConsumer();
- feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
-
- compareResult(mergedCounters[0], accurateCounter);
- FileUtils.forceDelete(new File(dataFile));
-
- }
-
- private TopNCounterComparisonTest.SpaceSavingConsumer[] mergeSpaceSavingConsumer(TopNCounterComparisonTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
- List<TopNCounterComparisonTest.SpaceSavingConsumer> list = Lists.newArrayList();
- if (consumers.length == 1)
- return consumers;
-
- for (int i = 0, n = consumers.length; i < n; i = i + 2) {
- if (i + 1 < n) {
- consumers[i].vs.merge(consumers[i + 1].vs);
- }
-
- list.add(consumers[i]);
- }
-
- return mergeSpaceSavingConsumer(list.toArray(new TopNCounterComparisonTest.SpaceSavingConsumer[list.size()]));
- }
-
- private void feedDataToConsumer(String dataFile, TopNCounterComparisonTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
- long startTime = System.currentTimeMillis();
- BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
-
- int lineNum = 0;
- String line = bufferedReader.readLine();
- while (line != null) {
- if (lineNum >= startLine && lineNum < endLine) {
- consumer.addElement(line, 1.0);
- }
- line = bufferedReader.readLine();
- lineNum++;
- }
-
- bufferedReader.close();
- System.out.println("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
- }
-
- private static interface TestDataConsumer {
- public void addElement(String elementKey, double value);
-
- public List<Pair<String, Double>> getTopN(int k);
-
- public long getSpentTime();
- }
-
- private class SpaceSavingConsumer implements TopNCounterComparisonTest.TestDataConsumer {
- private long timeSpent = 0;
- protected TopNCounter<String> vs;
-
- public SpaceSavingConsumer() {
- vs = new TopNCounter<String>(TOP_K * SPACE_SAVING_ROOM);
-
- }
-
- public void addElement(String key, double value) {
- //System.out.println("Adding " + key + ":" + incrementCount);
- long startTime = System.currentTimeMillis();
- vs.offer(key, value);
- timeSpent += (System.currentTimeMillis() - startTime);
- }
-
- @Override
- public List<Pair<String, Double>> getTopN(int k) {
- long startTime = System.currentTimeMillis();
- List<Counter<String>> tops = vs.topK(k);
- List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
- for (Counter<String> counter : tops)
- allRecords.add(new Pair(counter.getItem(), counter.getCount()));
- timeSpent += (System.currentTimeMillis() - startTime);
- return allRecords;
- }
-
- @Override
- public long getSpentTime() {
- return timeSpent;
- }
- }
-
- private class HashMapConsumer implements TopNCounterComparisonTest.TestDataConsumer {
-
- private long timeSpent = 0;
- private Map<String, Double> hashMap;
-
- public HashMapConsumer() {
- hashMap = Maps.newHashMap();
- }
-
- public void addElement(String key, double value) {
- long startTime = System.currentTimeMillis();
- if (hashMap.containsKey(key)) {
- hashMap.put(key, hashMap.get(key) + value);
- } else {
- hashMap.put(key, value);
- }
- timeSpent += (System.currentTimeMillis() - startTime);
- }
-
- @Override
- public List<Pair<String, Double>> getTopN(int k) {
- long startTime = System.currentTimeMillis();
- List<Pair<String, Double>> allRecords = Lists.newArrayList();
-
- for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
- allRecords.add(new Pair(entry.getKey(), entry.getValue()));
- }
-
- Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
- @Override
- public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
- return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
- }
- });
- timeSpent += (System.currentTimeMillis() - startTime);
- return allRecords.subList(0, k);
- }
-
- @Override
- public long getSpentTime() {
- return timeSpent;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/90c5351b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index 23620d1..e5350ba 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -16,159 +16,289 @@
package org.apache.kylin.common.topn;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
import org.junit.Test;
import java.io.*;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+public class TopNCounterTest {
+ protected static int TOP_K;
-public class TopNCounterTest {
+ protected static int KEY_SPACE;
- private static final int NUM_ITERATIONS = 100000;
+ protected static int TOTAL_RECORDS;
- @Test
- public void testTopNCounter() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
- for (String i : stream) {
- vs.offer(i);
- /*
- for(String s : vs.poll(3))
- System.out.print(s+" ");
- */
- System.out.println(vs);
- }
-
- List<Counter<String>> topk = vs.topK(6);
-
- for(Counter<String> top : topk) {
- System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
- }
-
+ protected static int SPACE_SAVING_ROOM;
+
+ protected static int PARALLEL = 10;
+
+ protected static boolean verbose = true;
+
+ public TopNCounterTest() {
+ TOP_K = 100;
+ KEY_SPACE = 100 * TOP_K;
+ TOTAL_RECORDS = 1000000; // 1 million
+ SPACE_SAVING_ROOM = 100;
}
- @Test
- public void testTopK() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
+ protected String prepareTestDate() throws IOException {
+ String[] allKeys = new String[KEY_SPACE];
+
+ for (int i = 0; i < KEY_SPACE; i++) {
+ allKeys[i] = RandomStringUtils.randomAlphabetic(10);
}
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+ outputMsg("Start to create test random data...");
+ long startTime = System.currentTimeMillis();
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ int keyIndex;
+
+ File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+ if (tempFile.exists())
+ FileUtils.forceDelete(tempFile);
+ FileWriter fw = new FileWriter(tempFile);
+ try {
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ keyIndex = zipf.sample();
+ fw.write(allKeys[keyIndex]);
+ fw.write('\n');
+ }
+ } finally {
+ if (fw != null)
+ fw.close();
}
+
+ outputMsg("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds.");
+ outputMsg("Test data in : " + tempFile.getAbsolutePath());
+
+ return tempFile.getAbsolutePath();
}
@Test
- public void testTopKWithIncrement() {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i, 10);
+ public void testSingleSpaceSaving() throws IOException {
+ String dataFile = prepareTestDate();
+ TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+ TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+
+ for (TopNCounterTest.TestDataConsumer consumer : new TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+ feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
}
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+
+ FileUtils.forceDelete(new File(dataFile));
+
+ compareResult(spaceSavingCounter, accurateCounter);
+ }
+
+ private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, TopNCounterTest.TestDataConsumer secondConsumer) {
+ List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+ outputMsg("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds");
+ List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K);
+ outputMsg("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds");
+
+ int error = 0;
+ for (int i = 0; i < topResult1.size(); i++) {
+ outputMsg("Compare " + i);
+
+ if (isClose(topResult1.get(i).getSecond().doubleValue(), realSequence.get(i).getSecond().doubleValue())) {
+ // if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) {
+ outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ } else {
+ outputMsg("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+ outputMsg("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+ error++;
+ }
}
+
+ org.junit.Assert.assertEquals(0, error);
+ }
+
+ private boolean isClose(double value1, double value2) {
+
+ if(Math.abs(value1 - value2) < 5.0)
+ return true;
+
+ return false;
}
@Test
- public void testTopKWithIncrementOutOfOrder() {
- TopNCounter<String> vs_increment = new TopNCounter<String>(3);
- TopNCounter<String> vs_single = new TopNCounter<String>(3);
- String[] stream = {"A", "B", "C", "D", "A"};
- Integer[] increments = {15, 20, 25, 30, 1};
-
- for (int i = 0; i < stream.length; i++) {
- vs_increment.offer(stream[i], increments[i]);
- for (int k = 0; k < increments[i]; k++) {
- vs_single.offer(stream[i]);
- }
+ public void testParallelSpaceSaving() throws IOException, ClassNotFoundException {
+ String dataFile = prepareTestDate();
+
+ TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterTest.SpaceSavingConsumer[PARALLEL];
+
+ for (int i = 0; i < PARALLEL; i++) {
+ parallelCounters[i] = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
}
- System.out.println("Insert with counts vs. single inserts:");
- System.out.println(vs_increment);
- System.out.println(vs_single);
- List<Counter<String>> topK_increment = vs_increment.topK(3);
- List<Counter<String>> topK_single = vs_single.topK(3);
+ int slice = TOTAL_RECORDS / PARALLEL;
+ int startPosition = 0;
+ for (int i = 0; i < PARALLEL; i++) {
+ feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice);
+ startPosition += slice;
+ }
+
+ TopNCounterTest.SpaceSavingConsumer[] mergedCounters = singleMerge(parallelCounters);
+
+ TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer();
+ feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+ compareResult(mergedCounters[0], accurateCounter);
+ FileUtils.forceDelete(new File(dataFile));
+
+ }
+
+ private TopNCounterTest.SpaceSavingConsumer[] singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
- for (int i = 0; i < topK_increment.size(); i++) {
- assertEquals(topK_increment.get(i).getItem(),
- topK_single.get(i).getItem());
+ TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+
+ for (int i=0, n=consumers.length; i<n; i++) {
+ merged.vs.merge(consumers[i].vs);
}
+
+ merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements;
+ return new TopNCounterTest.SpaceSavingConsumer[] {merged};
+
}
- @SuppressWarnings("unchecked")
- @Test
- public void testCounterSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(c);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Counter<String> clone = (Counter<String>) oi.readObject();
- assertEquals(c.getCount(), clone.getCount(), 0.0001);
- assertEquals(c.getError(), clone.getError(), 0.0001);
- assertEquals(c.getItem(), clone.getItem());
+ private TopNCounterTest.SpaceSavingConsumer[] binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException {
+ List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+ if (consumers.length == 1)
+ return consumers;
+
+
+ for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+ if (i + 1 < n) {
+ consumers[i].vs.merge(consumers[i + 1].vs);
+ }
+
+ list.add(consumers[i]);
}
+
+ return binaryMerge(list.toArray(new TopNCounterTest.SpaceSavingConsumer[list.size()]));
}
+
+ private void feedDataToConsumer(String dataFile, TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException {
+ long startTime = System.currentTimeMillis();
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile));
- @SuppressWarnings("unchecked")
- @Test
- public void testSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
+ int lineNum = 0;
+ String line = bufferedReader.readLine();
+ while (line != null) {
+ if (lineNum >= startLine && lineNum < endLine) {
+ consumer.addElement(line, 1.0);
+ }
+ line = bufferedReader.readLine();
+ lineNum++;
}
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(vs);
- oo.close();
+ bufferedReader.close();
+ outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+ }
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
+ private void outputMsg(String msg) {
+ if (verbose)
+ System.out.println(msg);
+ }
- assertEquals(vs.toString(), clone.toString());
+ private static interface TestDataConsumer {
+ public void addElement(String elementKey, double value);
+
+ public List<Pair<String, Double>> getTopN(int k);
+
+ public long getSpentTime();
}
+ private class SpaceSavingConsumer implements TopNCounterTest.TestDataConsumer {
+ private long timeSpent = 0;
+ protected TopNCounter<String> vs;
+
+ public SpaceSavingConsumer(int space) {
+ vs = new TopNCounter<String>(space);
- @Test
- public void testByteSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
}
- testSerialization(vs);
+ public void addElement(String key, double value) {
+ //outputMsg("Adding " + key + ":" + incrementCount);
+ long startTime = System.currentTimeMillis();
+ vs.offer(key, value);
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Counter<String>> tops = vs.topK(k);
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
- // Empty
- vs = new TopNCounter<String>(0);
- testSerialization(vs);
+ for (Counter<String> counter : tops)
+ allRecords.add(new Pair(counter.getItem(), counter.getCount()));
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords;
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
}
- private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
- byte[] bytes = vs.toBytes();
- TopNCounter<String> clone = new TopNCounter<String>(bytes);
+ private class HashMapConsumer implements TopNCounterTest.TestDataConsumer {
+
+ private long timeSpent = 0;
+ private Map<String, Double> hashMap;
+
+ public HashMapConsumer() {
+ hashMap = Maps.newHashMap();
+ }
+
+ public void addElement(String key, double value) {
+ long startTime = System.currentTimeMillis();
+ if (hashMap.containsKey(key)) {
+ hashMap.put(key, hashMap.get(key) + value);
+ } else {
+ hashMap.put(key, value);
+ }
+ timeSpent += (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public List<Pair<String, Double>> getTopN(int k) {
+ long startTime = System.currentTimeMillis();
+ List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+ for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+ allRecords.add(new Pair(entry.getKey(), entry.getValue()));
+ }
- assertEquals(vs.toString(), clone.toString());
+ Collections.sort(allRecords, new Comparator<Pair<String, Double>>() {
+ @Override
+ public int compare(Pair<String, Double> o1, Pair<String, Double> o2) {
+ return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0);
+ }
+ });
+ timeSpent += (System.currentTimeMillis() - startTime);
+ return allRecords.subList(0, k);
+ }
+
+ @Override
+ public long getSpentTime() {
+ return timeSpent;
+ }
}
+
}