You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/17 08:10:47 UTC

[3/7] incubator-kylin git commit: KYLIN-943 Update TopNCounter serialization to ascending order

KYLIN-943 Update TopNCounter serialization to ascending order


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f3fe0e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f3fe0e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f3fe0e50

Branch: refs/heads/KYLIN-943
Commit: f3fe0e509fdf6a87baeb917b7588f390064afa6f
Parents: 6208520
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 1 21:02:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Sep 17 14:07:21 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/topn/TopNCounter.java   |  78 +++----------
 .../kylin/common/topn/TopNCounterBasicTest.java | 110 +++----------------
 .../serializer/TopNCounterSerializerTest.java   |  41 +++++++
 3 files changed, 71 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 51fe1b2..69e8d56 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -38,7 +38,7 @@ import java.util.Map;
  *
  * @param <T> type of data in the stream to be summarized
  */
-public class TopNCounter<T> implements ITopK<T>, Externalizable {
+public class TopNCounter<T> implements ITopK<T> {
     
     public static final int EXTRA_SPACE_RATE = 50;
 
@@ -101,7 +101,7 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
      * @param item stream element (<i>e</i>)
      * @return item dropped from summary if an item was dropped, null otherwise
      */
-    public T offerReturnDropped(T item, int incrementCount) {
+    public T offerReturnDropped(T item, double incrementCount) {
         return offerReturnAll(item, incrementCount).getSecond();
     }
 
@@ -241,28 +241,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         return sb.toString();
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        this.bucketList = new DoublyLinkedList<Bucket>();
-        this.capacity = in.readInt();
-
-        int size = in.readInt();
-        this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
-        Bucket currentBucket = null;
-        ListNode2<Bucket> currentBucketNode = null;
-        for (int i = 0; i < size; i++) {
-            Counter<T> c = (Counter<T>) in.readObject();
-            if (currentBucket == null || c.count != currentBucket.count) {
-                currentBucket = new Bucket(c.count);
-                currentBucketNode = bucketList.add(currentBucket);
-            }
-            c.bucketNode = currentBucketNode;
-            counterMap.put(c.item, currentBucket.counterList.add(c));
-        }
-    }
-
     public void fromExternal(int size, double[] counters, List<T> items) {
         this.bucketList = new DoublyLinkedList<Bucket>();
 
@@ -283,18 +261,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
     }
 
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(this.capacity);
-        out.writeInt(this.size());
-        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
-            Bucket b = bNode.getValue();
-            for (Counter<T> c : b.counterList) {
-                out.writeObject(c);
-            }
-        }
-    }
-
     /**
      * For de-serialization
      */
@@ -302,30 +268,6 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
     }
 
     /**
-     * For de-serialization
-     *
-     * @param bytes
-     * @throws java.io.IOException
-     * @throws ClassNotFoundException
-     */
-    public TopNCounter(byte[] bytes) throws IOException, ClassNotFoundException {
-        fromBytes(bytes);
-    }
-
-    public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
-        readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
-    }
-
-    public byte[] toBytes() throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream out = new ObjectOutputStream(baos);
-        this.writeExternal(out);
-        out.flush();
-        return baos.toByteArray();
-
-    }
-
-    /**
      * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
      * @param another
      * @return
@@ -387,12 +329,16 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
         }
 
     }
-    
+
+    /**
+     * Get the counter values in ascending order
+     * @return
+     */
     public double[] getCounters() {
         double[] counters = new double[size()];
         int index = 0;
 
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
             Bucket b = bNode.getValue();
             for (Counter<T> c : b.counterList) {
                 counters[index] = c.count;
@@ -402,10 +348,14 @@ public class TopNCounter<T> implements ITopK<T>, Externalizable {
 
         return counters;
     }
-    
+
+    /**
+     * Get the item list order by counter values in ascending order
+     * @return
+     */
     public List<T> getItems() {
         List<T> items = Lists.newArrayList();
-        for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
+        for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
             Bucket b = bNode.getValue();
             for (Counter<T> c : b.counterList) {
                 items.add(c.item);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index e25f651..252e955 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,45 +16,36 @@
 
 package org.apache.kylin.common.topn;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.junit.Test;
 
-import java.io.*;
 import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-
 public class TopNCounterBasicTest {
 
     @Test
     public void testTopNCounter() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y" };
         for (String i : stream) {
-                vs.offer(i);
-            /*
-        for(String s : vs.poll(3))
-        System.out.print(s+" ");
-             */
-            System.out.println(vs);
+            vs.offer(i);
         }
 
         List<Counter<String>> topk = vs.topK(6);
-        
-        for(Counter<String> top : topk) {
+
+        for (Counter<String> top : topk) {
             System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
         }
-        
+
     }
 
     @Test
     public void testTopK() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);
         }
@@ -67,7 +58,7 @@ public class TopNCounterBasicTest {
     @Test
     public void testTopKWithIncrement() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i, 10);
         }
@@ -81,8 +72,8 @@ public class TopNCounterBasicTest {
     public void testTopKWithIncrementOutOfOrder() {
         TopNCounter<String> vs_increment = new TopNCounter<String>(3);
         TopNCounter<String> vs_single = new TopNCounter<String>(3);
-        String[] stream = {"A", "B", "C", "D", "A"};
-        Integer[] increments = {15, 20, 25, 30, 1};
+        String[] stream = { "A", "B", "C", "D", "A" };
+        Integer[] increments = { 15, 20, 25, 30, 1 };
 
         for (int i = 0; i < stream.length; i++) {
             vs_increment.offer(stream[i], increments[i]);
@@ -98,102 +89,33 @@ public class TopNCounterBasicTest {
         List<Counter<String>> topK_single = vs_single.topK(3);
 
         for (int i = 0; i < topK_increment.size(); i++) {
-            assertEquals(topK_increment.get(i).getItem(),
-                    topK_single.get(i).getItem());
+            assertEquals(topK_increment.get(i).getItem(), topK_single.get(i).getItem());
         }
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testCounterSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-        List<Counter<String>> topK = vs.topK(3);
-        for (Counter<String> c : topK) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutput oo = new ObjectOutputStream(baos);
-            oo.writeObject(c);
-            oo.close();
-
-            ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-            Counter<String> clone = (Counter<String>) oi.readObject();
-            assertEquals(c.getCount(), clone.getCount(), 0.0001);
-            assertEquals(c.getError(), clone.getError(), 0.0001);
-            assertEquals(c.getItem(), clone.getItem());
-        }
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutput oo = new ObjectOutputStream(baos);
-        oo.writeObject(vs);
-        oo.close();
-
-        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-        TopNCounter<String> clone = (TopNCounter<String>) oi.readObject();
-
-        assertEquals(vs.toString(), clone.toString());
-    }
-
-
-    @Test
-    public void testByteSerialization() throws IOException, ClassNotFoundException {
-        TopNCounter<String> vs = new TopNCounter<String>(3);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
-        for (String i : stream) {
-            vs.offer(i);
-        }
-
-        testSerialization(vs);
-
-        // Empty
-        vs = new TopNCounter<String>(0);
-        testSerialization(vs);
-    }
-
-    private void testSerialization(TopNCounter<?> vs) throws IOException, ClassNotFoundException {
-        byte[] bytes = vs.toBytes();
-        TopNCounter<String> clone = new TopNCounter<String>(bytes);
-
-        assertEquals(vs.toString(), clone.toString());
-    }
-    
     @Test
     public void testRetain() {
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" };
         for (String i : stream) {
             vs.offer(i);
         }
-        
+
         vs.retain(5);
         assertTrue(vs.size() <= 5);
         assertTrue(vs.getCapacity() <= 5);
     }
-    
+
     @Test
     public void testMerge() {
 
         TopNCounter<String> vs = new TopNCounter<String>(10);
-        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
         for (String i : stream) {
             vs.offer(i);
         }
 
-
-        String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+        String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" };
         TopNCounter<String> vs2 = new TopNCounter<String>(10);
         for (String i : stream2) {
             vs2.offer(i);
@@ -204,6 +126,6 @@ public class TopNCounterBasicTest {
         for (Counter<String> c : topK) {
             assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
         }
-        
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f3fe0e50/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..050193b
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ */
+public class TopNCounterSerializerTest {
+
+    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)"));
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCounterSerialization() {
+        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+        for (Integer i : stream) {
+            vs.offer(new ByteArray(Bytes.toBytes(i)));
+        }
+
+        ByteBuffer out = ByteBuffer.allocate(1024 * 1024);
+        serializer.serialize(vs, out);
+        
+        byte[] copyBytes = new byte[out.position()];
+        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+        ByteBuffer in = ByteBuffer.wrap(copyBytes);
+        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+        Assert.assertEquals(vs.toString(), vsNew.toString());
+
+    }
+
+}