You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/17 08:10:47 UTC
[3/7] incubator-kylin git commit: KYLIN-943 Update TopNCounter
serialization to ascending order
KYLIN-943 Update TopNCounter serialization to ascending order
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f3fe0e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f3fe0e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f3fe0e50
Branch: refs/heads/KYLIN-943
Commit: f3fe0e509fdf6a87baeb917b7588f390064afa6f
Parents: 6208520
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 21:02:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/topn/TopNCounter.java | 78 +++----------
.../kylin/common/topn/TopNCounterBasicTest.java | 110 +++----------------
.../serializer/TopNCounterSerializerTest.java | 41 +++++++
3 files changed, 71 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 51fe1b2..69e8d56 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -38,7 +38,7 @@ import java.util.Map;
*
* @param <T> type of data in the stream to be summarized
*/
-public class TopNCounter<T> implements ITopK<T>, Externalizable {
+public class TopNCounter<T> implements ITopK<T> {
public static final int EXTRA_SPACE_RATE = 50;
@@ -101,7 +101,7 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
* @param item stream element (<i>e</i>)
* @return item dropped from summary if an item was dropped, null otherwise
*/
- public T offerReturnDropped(T item, int incrementCount) {
+ public T offerReturnDropped(T item, double incrementCount) {
return offerReturnAll(item, incrementCount).getSecond();
}
@@ -241,28 +241,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return sb.toString();
}
- @SuppressWarnings("unchecked")
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- this.bucketList = new DoublyLinkedList<Bucket>();
- this.capacity = in.readInt();
-
- int size = in.readInt();
- this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
- Bucket currentBucket = null;
- ListNode2<Bucket> currentBucketNode = null;
- for (int i = 0; i < size; i++) {
- Counter<T> c = (Counter<T>) in.readObject();
- if (currentBucket == null || c.count != currentBucket.count) {
- currentBucket = new Bucket(c.count);
- currentBucketNode = bucketList.add(currentBucket);
- }
- c.bucketNode = currentBucketNode;
- counterMap.put(c.item, currentBucket.counterList.add(c));
- }
- }
-
public void fromExternal(int size, double[] counters, List<T> items) {
this.bucketList = new DoublyLinkedList<Bucket>();
@@ -283,18 +261,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(this.capacity);
- out.writeInt(this.size());
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- out.writeObject(c);
- }
- }
- }
-
/**
* For de-serialization
*/
@@ -302,30 +268,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
/**
- * For de-serialization
- *
- * @param bytes
- * @throws java.io.IOException
- * @throws ClassNotFoundException
- */
- public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
- fromBytes(bytes);
- }
-
- public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
- readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
- }
-
- public byte[] toBytes() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(baos);
- this.writeExternal(out);
- out.flush();
- return baos.toByteArray();
-
- }
-
- /**
* Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
* @param another
* @return
@@ -387,12 +329,16 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
}
}
-
+
+ /**
+ * Get the counter values in ascending order
+ * @return
+ */
public double[] getCounters() {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
counters[index] = c.count;
@@ -402,10 +348,14 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
return counters;
}
-
+
+ /**
+ * Get the item list order by counter values in ascending order
+ * @return
+ */
public List<T> getItems() {
List<T> items = Lists.newArrayList();
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+ for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
items.add(c.item);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index e25f651..252e955 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,45 +16,36 @@
package org.apache.kylin.common.topn;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.junit.Test;
-import java.io.*;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
public class TopNCounterBasicTest {
@Test
public void testTopNCounter() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y" };
for (String i : stream) {
- vs.offer(i);
- /*
- for(String s : vs.poll(3))
- System.out.print(s+" ");
- */
- System.out.println(vs);
+ vs.offer(i);
}
List<Counter<String>> topk = vs.topK(6);
-
- for(Counter<String> top : topk) {
+
+ for (Counter<String> top : topk) {
System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
}
-
+
}
@Test
public void testTopK() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);
}
@@ -67,7 +58,7 @@ public class TopNCounterBasicTest {
@Test
public void testTopKWithIncrement() {
TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i, 10);
}
@@ -81,8 +72,8 @@ public class TopNCounterBasicTest {
public void testTopKWithIncrementOutOfOrder() {
TopNCounter<String> vs_increment = new TopNCounter<String>(3);
TopNCounter<String> vs_single = new TopNCounter<String>(3);
- String[] stream = {"A", "B", "C", "D", "A"};
- Integer[] increments = {15, 20, 25, 30, 1};
+ String[] stream = { "A", "B", "C", "D", "A" };
+ Integer[] increments = { 15, 20, 25, 30, 1 };
for (int i = 0; i < stream.length; i++) {
vs_increment.offer(stream[i], increments[i]);
@@ -98,102 +89,33 @@ public class TopNCounterBasicTest {
List<Counter<String>> topK_single = vs_single.topK(3);
for (int i = 0; i < topK_increment.size(); i++) {
- assertEquals(topK_increment.get(i).getItem(),
- topK_single.get(i).getItem());
+ assertEquals(topK_increment.get(i).getItem(), topK_single.get(i).getItem());
}
}
- @SuppressWarnings("unchecked")
- @Test
- public void testCounterSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
- List<Counter<String>> topK = vs.topK(3);
- for (Counter<String> c : topK) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(c);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Counter<String> clone = (Counter<String>) oi.readObject();
- assertEquals(c.getCount(), clone.getCount(), 0.0001);
- assertEquals(c.getError(), clone.getError(), 0.0001);
- assertEquals(c.getItem(), clone.getItem());
- }
- }
-
-
- @SuppressWarnings("unchecked")
- @Test
- public void testSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oo = new ObjectOutputStream(baos);
- oo.writeObject(vs);
- oo.close();
-
- ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
-
- assertEquals(vs.toString(), clone.toString());
- }
-
-
- @Test
- public void testByteSerialization() throws IOException, ClassNotFoundException {
- TopNCounter<String> vs = new TopNCounter<String>(3);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
- for (String i : stream) {
- vs.offer(i);
- }
-
- testSerialization(vs);
-
- // Empty
- vs = new TopNCounter<String>(0);
- testSerialization(vs);
- }
-
- private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
- byte[] bytes = vs.toBytes();
- TopNCounter<String> clone = new TopNCounter<String>(bytes);
-
- assertEquals(vs.toString(), clone.toString());
- }
-
@Test
public void testRetain() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
for (String i : stream) {
vs.offer(i);
}
-
+
vs.retain(5);
assertTrue(vs.size() <= 5);
assertTrue(vs.getCapacity() <= 5);
}
-
+
@Test
public void testMerge() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
for (String i : stream) {
vs.offer(i);
}
-
- String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+ String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" };
TopNCounter<String> vs2 = new TopNCounter<String>(10);
for (String i : stream2) {
vs2.offer(i);
@@ -204,6 +126,6 @@ public class TopNCounterBasicTest {
for (Counter<String> c : topK) {
assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..050193b
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class TopNCounterSerializerTest {
+
+ private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)"));
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCounterSerialization() {
+ TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+ Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+ for (Integer i : stream) {
+ vs.offer(new ByteArray(Bytes.toBytes(i)));
+ }
+
+ ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+ serializer.serialize(vs, out);
+
+ byte[] copyBytes = new byte[out.position()];
+ System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+ ByteBuffer in = ByteBuffer.wrap(copyBytes);
+ TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+ Assert.assertEquals(vs.toString(), vsNew.toString());
+
+ }
+
+}