You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/21 04:16:10 UTC

[04/13] incubator-kylin git commit: KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN

KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bfafeed2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bfafeed2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bfafeed2

Branch: refs/heads/KYLIN-943
Commit: bfafeed273f750bf88026512ddb447a976392a39
Parents: 7175845
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 26 17:47:38 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 21 10:11:53 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/topn/Counter.java   |  4 +-
 .../kylin/common/topn/TopNCounterBasicTest.java | 39 ++++++++++++-
 .../common/topn/TopNCounterCombinationTest.java | 38 ++++++------
 .../kylin/common/topn/TopNCounterTest.java      | 10 ++--
 .../metadata/measure/MeasureAggregator.java     |  2 +
 .../kylin/metadata/measure/TopNAggregator.java  | 60 +++++++++++++++++++
 .../serializer/TopNCounterSerializer.java       | 61 ++++++++++++++++++++
 .../apache/kylin/metadata/model/DataType.java   |  8 ++-
 .../kylin/metadata/model/FunctionDesc.java      |  9 ++-
 .../kylin/metadata/model/ParameterDesc.java     | 54 ++++++++---------
 10 files changed, 226 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 0f7d8de..866d3d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -72,14 +72,14 @@ public class Counter<T> implements Externalizable {
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         item = (T) in.readObject();
         count = in.readDouble();
-        error = in.readDouble();
+        //error = in.readDouble();
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(item);
         out.writeDouble(count);
-        out.writeDouble(error);
+        //out.writeDouble(error);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 0d59314..e25f651 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -30,8 +30,6 @@ import static org.junit.Assert.assertTrue;
 
 public class TopNCounterBasicTest {
 
-    private static final int NUM_ITERATIONS = 100000;
-
     @Test
     public void testTopNCounter() {
         TopNCounter<String> vs = new TopNCounter<String>(3);
@@ -171,4 +169,41 @@ public class TopNCounterBasicTest {
 
         assertEquals(vs.toString(), clone.toString());
     }
+    
+    @Test
+    public void testRetain() {
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        
+        vs.retain(5);
+        assertTrue(vs.size() <= 5);
+        assertTrue(vs.getCapacity() <= 5);
+    }
+    
+    @Test
+    public void testMerge() {
+
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+
+        String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+        TopNCounter<String> vs2 = new TopNCounter<String>(10);
+        for (String i : stream2) {
+            vs2.offer(i);
+        }
+        // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0
+        vs.merge(vs2);
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
+        }
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index afde8f7..cc0557e 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,37 +29,33 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
     public static Collection<Integer[]> configs() {
         //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
         return Arrays.asList(new Integer[][] {
-                /*
                 // with 20X space
-                { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
-                
-                */
+                { 10, 20 }, // top 10%
+                { 20, 20 }, // top 5%
+                { 100, 20 }, // top 1%
+                { 1000, 20 }, // top 0.1%
+
                 // with 50X space
-                { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+                { 10, 50 }, // top 10% 
+                { 20, 50 }, // top 5% 
+                { 100, 50 }, // top 1% 
+                { 1000, 50 }, // top 0.1%
 
-                /*
                 // with 100X space
-                { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
-                { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
-                { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
-                { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
-                */
+                { 10, 100 }, // top 10% 
+                { 20, 100 }, // top 5% 
+                { 100, 100 }, // top 1% 
+                { 1000, 100 }, // top 0.1% 
         });
     }
 
-    public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+    public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) throws Exception {
         super();
-        this.TOP_K = topK;
+        this.TOP_K = 100;
         this.KEY_SPACE = TOP_K * keySpaceRate;
         this.SPACE_SAVING_ROOM = spaceSavingRate;
         TOTAL_RECORDS = 1000000; // 1 million
-        this.PARALLEL = 50;
-        this.verbose = false;
+        this.PARALLEL = 10;
+        this.verbose = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index e5350ba..edc9ce2 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -41,7 +41,7 @@ public class TopNCounterTest {
     protected static int SPACE_SAVING_ROOM;
 
     protected static int PARALLEL = 10;
-
+    
     protected static boolean verbose = true;
 
     public TopNCounterTest() {
@@ -60,7 +60,7 @@ public class TopNCounterTest {
 
         outputMsg("Start to create test random data...");
         long startTime = System.currentTimeMillis();
-        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5);
         int keyIndex;
 
         File tempFile = File.createTempFile("ZipfDistribution", ".txt");
@@ -70,7 +70,7 @@ public class TopNCounterTest {
         FileWriter fw = new FileWriter(tempFile);
         try {
             for (int i = 0; i < TOTAL_RECORDS; i++) {
-                keyIndex = zipf.sample();
+                keyIndex = zipf.sample() -1;
                 fw.write(allKeys[keyIndex]);
                 fw.write('\n');
             }
@@ -85,7 +85,7 @@ public class TopNCounterTest {
         return tempFile.getAbsolutePath();
     }
 
-    @Test
+    //@Test
     public void testSingleSpaceSaving() throws IOException {
         String dataFile = prepareTestDate();
         TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
@@ -163,7 +163,7 @@ public class TopNCounterTest {
         if (consumers.length == 1)
             return consumers;
 
-        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+        TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
         
         for (int i=0, n=consumers.length; i<n; i++) {
             merged.vs.merge(consumers[i].vs);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index ddf0782..dadf137 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -57,6 +57,8 @@ abstract public class MeasureAggregator<V> implements Serializable {
                 return new BigDecimalMinAggregator();
             else if (isDouble(returnType))
                 return new DoubleMinAggregator();
+        } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+            return new TopNAggregator();
         }
         throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
new file mode 100644
index 0000000..9cea4cc
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+/**
+ * 
+ */
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+    int capacity = 0;
+    TopNCounter<ByteArray> sum = null;
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(TopNCounter<ByteArray> value) {
+        if (sum == null) {
+            sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
+            capacity = value.getCapacity();
+        }
+
+        sum.merge(value);
+    }
+
+    @Override
+    public TopNCounter getState() {
+        
+        sum.retain(capacity);
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 * capacity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
new file mode 100644
index 0000000..1c6c442
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 0;
+    }
+
+    @Override
+    public int maxLength() {
+        return 0;
+    }
+
+    @Override
+    public TopNCounter<ByteArray> valueOf(byte[] value) {
+        return null;
+    }
+
+    @Override
+    public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+
+    }
+
+    @Override
+    public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 2b2cbed..95262b4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -36,7 +36,7 @@ public class DataType implements Serializable {
 
     public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
             + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc" //
+            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn" //
             + "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
             + "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
 
@@ -184,6 +184,8 @@ public class DataType implements Serializable {
             return 8;
         } else if (isHLLC()) {
             return 1 << precision;
+        } else if (isTopN()) {
+            return 8 * precision * 50;
         }
         throw new IllegalStateException("The return type : " + name + " is not recognized;");
     }
@@ -251,6 +253,10 @@ public class DataType implements Serializable {
     public boolean isHLLC() {
         return name.equals("hllc");
     }
+    
+    public boolean isTopN() {
+        return name.equals("topn");
+    }
 
     public String getName() {
         return name;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e3f4c48..b87d50c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -34,10 +34,11 @@ public class FunctionDesc {
     public static final String FUNC_MAX = "MAX";
     public static final String FUNC_COUNT = "COUNT";
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_TOP_N = "TOP_N";
 
     public static final String PARAMTER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
-
+    
     @JsonProperty("expression")
     private String expression;
     @JsonProperty("parameter")
@@ -92,6 +93,10 @@ public class FunctionDesc {
         return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
     }
 
+    public boolean isTopN() {
+        return FUNC_TOP_N.equalsIgnoreCase(expression);
+    }
+
     public boolean isHolisticCountDistinct() {
         if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) {
             return true;
@@ -138,7 +143,7 @@ public class FunctionDesc {
     }
 
     public DataType getSQLType() {
-        if (isCountDistinct())
+        if (isCountDistinct() || isTopN())
             return DataType.ANY;
         else if (isSum() || isMax() || isMin())
             return parameter.getColRefs().get(0).getType();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 1565862..bdb4b91 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -39,6 +39,9 @@ public class ParameterDesc {
     private String type;
     @JsonProperty("value")
     private String value;
+    
+    @JsonProperty("counter")
+    private String counter;
 
     private List<TblColRef> colRefs;
 
@@ -62,6 +65,14 @@ public class ParameterDesc {
         this.value = value;
     }
 
+    public String getCounter() {
+        return counter;
+    }
+
+    public void setCounter(String counter) {
+        this.counter = counter;
+    }
+
     public List<TblColRef> getColRefs() {
         return colRefs;
     }
@@ -85,39 +96,30 @@ public class ParameterDesc {
     }
 
     @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
-        result = prime * result + ((value == null) ? 0 : value.hashCode());
-        return result;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ParameterDesc that = (ParameterDesc) o;
+
+        if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+        if (type != null ? !type.equals(that.type) : that.type != null) return false;
+        if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+        return true;
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        ParameterDesc other = (ParameterDesc) obj;
-        if (type == null) {
-            if (other.type != null)
-                return false;
-        } else if (!type.equals(other.type))
-            return false;
-        if (value == null) {
-            if (other.value != null)
-                return false;
-        } else if (!value.equals(other.value))
-            return false;
-        return true;
+    public int hashCode() {
+        int result = type != null ? type.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (counter != null ? counter.hashCode() : 0);
+        return result;
     }
 
     @Override
     public String toString() {
-        return "ParameterDesc [type=" + type + ", value=" + value + "]";
+        return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
     }
 
 }