You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/21 04:16:10 UTC
[04/13] incubator-kylin git commit: KYLIN-943 Update Aggregator,
DataType, FunctionDesc etc for TopN
KYLIN-943 Update Aggregator, DataType, FunctionDesc etc for TopN
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bfafeed2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bfafeed2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bfafeed2
Branch: refs/heads/KYLIN-943
Commit: bfafeed273f750bf88026512ddb447a976392a39
Parents: 7175845
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 26 17:47:38 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 21 10:11:53 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/topn/Counter.java | 4 +-
.../kylin/common/topn/TopNCounterBasicTest.java | 39 ++++++++++++-
.../common/topn/TopNCounterCombinationTest.java | 38 ++++++------
.../kylin/common/topn/TopNCounterTest.java | 10 ++--
.../metadata/measure/MeasureAggregator.java | 2 +
.../kylin/metadata/measure/TopNAggregator.java | 60 +++++++++++++++++++
.../serializer/TopNCounterSerializer.java | 61 ++++++++++++++++++++
.../apache/kylin/metadata/model/DataType.java | 8 ++-
.../kylin/metadata/model/FunctionDesc.java | 9 ++-
.../kylin/metadata/model/ParameterDesc.java | 54 ++++++++---------
10 files changed, 226 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 0f7d8de..866d3d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -72,14 +72,14 @@ public class Counter<T> implements Externalizable {
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
item = (T) in.readObject();
count = in.readDouble();
- error = in.readDouble();
+ //error = in.readDouble();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(item);
out.writeDouble(count);
- out.writeDouble(error);
+ //out.writeDouble(error);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 0d59314..e25f651 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -30,8 +30,6 @@ import static org.junit.Assert.assertTrue;
public class TopNCounterBasicTest {
- private static final int NUM_ITERATIONS = 100000;
-
@Test
public void testTopNCounter() {
TopNCounter<String> vs = new TopNCounter<String>(3);
@@ -171,4 +169,41 @@ public class TopNCounterBasicTest {
assertEquals(vs.toString(), clone.toString());
}
+
+ @Test
+ public void testRetain() {
+ TopNCounter<String> vs = new TopNCounter<String>(10);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+ vs.retain(5);
+ assertTrue(vs.size() <= 5);
+ assertTrue(vs.getCapacity() <= 5);
+ }
+
+ @Test
+ public void testMerge() {
+
+ TopNCounter<String> vs = new TopNCounter<String>(10);
+ String[] stream = {"X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B"};
+ for (String i : stream) {
+ vs.offer(i);
+ }
+
+
+ String[] stream2 = {"B", "B", "Z", "Z", "B", "C", "X", "X"};
+ TopNCounter<String> vs2 = new TopNCounter<String>(10);
+ for (String i : stream2) {
+ vs2.offer(i);
+ }
+ // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0
+ vs.merge(vs2);
+ List<Counter<String>> topK = vs.topK(3);
+ for (Counter<String> c : topK) {
+ assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index afde8f7..cc0557e 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,37 +29,33 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
public static Collection<Integer[]> configs() {
// return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
return Arrays.asList(new Integer[][] {
- /*
// with 20X space
- { 100, 10, 20 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 20 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 20 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 20 }, // top 100 among 100,000 keys (top 0.1%)
-
- */
+ { 10, 20 }, // top 10%
+ { 20, 20 }, // top 5%
+ { 100, 20 }, // top 1%
+ { 1000, 20 }, // top 0.1%
+
// with 50X space
- { 100, 10, 50 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 50 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 50 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 50 }, // top 100 among 100,000 keys (top 0.1%)
+ { 10, 50 }, // top 10%
+ { 20, 50 }, // top 5%
+ { 100, 50 }, // top 1%
+ { 1000, 50 }, // top 0.1%
- /*
// with 100X space
- { 100, 10, 100 }, // top 100 among 1,000 keys (top 10%)
- { 100, 20, 100 }, // top 100 among 2,000 keys (top 5%)
- { 100, 100, 100 }, // top 100 among 10,000 keys (top 1%)
- { 100, 1000, 100 }, // top 100 among 100,000 keys (top 0.1%)
- */
+ { 10, 100 }, // top 10%
+ { 20, 100 }, // top 5%
+ { 100, 100 }, // top 1%
+ { 1000, 100 }, // top 0.1%
});
}
- public TopNCounterCombinationTest(int topK, int keySpaceRate, int spaceSavingRate) throws Exception {
+ public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) throws Exception {
super();
- this.TOP_K = topK;
+ this.TOP_K = 100;
this.KEY_SPACE = TOP_K * keySpaceRate;
this.SPACE_SAVING_ROOM = spaceSavingRate;
TOTAL_RECORDS = 1000000; // 1 million
- this.PARALLEL = 50;
- this.verbose = false;
+ this.PARALLEL = 10;
+ this.verbose = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index e5350ba..edc9ce2 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -41,7 +41,7 @@ public class TopNCounterTest {
protected static int SPACE_SAVING_ROOM;
protected static int PARALLEL = 10;
-
+
protected static boolean verbose = true;
public TopNCounterTest() {
@@ -60,7 +60,7 @@ public class TopNCounterTest {
outputMsg("Start to create test random data...");
long startTime = System.currentTimeMillis();
- ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE - 1, 0.8);
+ ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5);
int keyIndex;
File tempFile = File.createTempFile("ZipfDistribution", ".txt");
@@ -70,7 +70,7 @@ public class TopNCounterTest {
FileWriter fw = new FileWriter(tempFile);
try {
for (int i = 0; i < TOTAL_RECORDS; i++) {
- keyIndex = zipf.sample();
+ keyIndex = zipf.sample() -1;
fw.write(allKeys[keyIndex]);
fw.write('\n');
}
@@ -85,7 +85,7 @@ public class TopNCounterTest {
return tempFile.getAbsolutePath();
}
- @Test
+ //@Test
public void testSingleSpaceSaving() throws IOException {
String dataFile = prepareTestDate();
TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
@@ -163,7 +163,7 @@ public class TopNCounterTest {
if (consumers.length == 1)
return consumers;
- TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * 10);
+ TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
for (int i=0, n=consumers.length; i<n; i++) {
merged.vs.merge(consumers[i].vs);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index ddf0782..dadf137 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -57,6 +57,8 @@ abstract public class MeasureAggregator<V> implements Serializable {
return new BigDecimalMinAggregator();
else if (isDouble(returnType))
return new DoubleMinAggregator();
+ } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+ return new TopNAggregator();
}
throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
new file mode 100644
index 0000000..9cea4cc
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+/**
+ *
+ */
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+ int capacity = 0;
+ TopNCounter<ByteArray> sum = null;
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(TopNCounter<ByteArray> value) {
+ if (sum == null) {
+ sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
+ capacity = value.getCapacity();
+ }
+
+ sum.merge(value);
+ }
+
+ @Override
+ public TopNCounter getState() {
+
+ sum.retain(capacity);
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ // 1024 + 60 returned by AggregationCacheMemSizeTest
+ return 8 * capacity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
new file mode 100644
index 0000000..1c6c442
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<TopNCounter<ByteArray>> current = new ThreadLocal<TopNCounter<ByteArray>>();
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 0;
+ }
+
+ @Override
+ public int maxLength() {
+ return 0;
+ }
+
+ @Override
+ public TopNCounter<ByteArray> valueOf(byte[] value) {
+ return null;
+ }
+
+ @Override
+ public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+
+ }
+
+ @Override
+ public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 2b2cbed..95262b4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -36,7 +36,7 @@ public class DataType implements Serializable {
public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+ "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
- + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc" //
+ + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn" //
+ "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
+ "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
@@ -184,6 +184,8 @@ public class DataType implements Serializable {
return 8;
} else if (isHLLC()) {
return 1 << precision;
+ } else if (isTopN()) {
+ return 8 * precision * 50;
}
throw new IllegalStateException("The return type : " + name + " is not recognized;");
}
@@ -251,6 +253,10 @@ public class DataType implements Serializable {
public boolean isHLLC() {
return name.equals("hllc");
}
+
+ public boolean isTopN() {
+ return name.equals("topn");
+ }
public String getName() {
return name;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e3f4c48..b87d50c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -34,10 +34,11 @@ public class FunctionDesc {
public static final String FUNC_MAX = "MAX";
public static final String FUNC_COUNT = "COUNT";
public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+ public static final String FUNC_TOP_N = "TOP_N";
public static final String PARAMTER_TYPE_CONSTANT = "constant";
public static final String PARAMETER_TYPE_COLUMN = "column";
-
+
@JsonProperty("expression")
private String expression;
@JsonProperty("parameter")
@@ -92,6 +93,10 @@ public class FunctionDesc {
return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
}
+ public boolean isTopN() {
+ return FUNC_TOP_N.equalsIgnoreCase(expression);
+ }
+
public boolean isHolisticCountDistinct() {
if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) {
return true;
@@ -138,7 +143,7 @@ public class FunctionDesc {
}
public DataType getSQLType() {
- if (isCountDistinct())
+ if (isCountDistinct() || isTopN())
return DataType.ANY;
else if (isSum() || isMax() || isMin())
return parameter.getColRefs().get(0).getType();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bfafeed2/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 1565862..bdb4b91 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -39,6 +39,9 @@ public class ParameterDesc {
private String type;
@JsonProperty("value")
private String value;
+
+ @JsonProperty("counter")
+ private String counter;
private List<TblColRef> colRefs;
@@ -62,6 +65,14 @@ public class ParameterDesc {
this.value = value;
}
+ public String getCounter() {
+ return counter;
+ }
+
+ public void setCounter(String counter) {
+ this.counter = counter;
+ }
+
public List<TblColRef> getColRefs() {
return colRefs;
}
@@ -85,39 +96,30 @@ public class ParameterDesc {
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((type == null) ? 0 : type.hashCode());
- result = prime * result + ((value == null) ? 0 : value.hashCode());
- return result;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ParameterDesc that = (ParameterDesc) o;
+
+ if (counter != null ? !counter.equals(that.counter) : that.counter != null) return false;
+ if (type != null ? !type.equals(that.type) : that.type != null) return false;
+ if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+ return true;
}
@Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ParameterDesc other = (ParameterDesc) obj;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- if (value == null) {
- if (other.value != null)
- return false;
- } else if (!value.equals(other.value))
- return false;
- return true;
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
+ result = 31 * result + (counter != null ? counter.hashCode() : 0);
+ return result;
}
@Override
public String toString() {
- return "ParameterDesc [type=" + type + ", value=" + value + "]";
+ return "ParameterDesc [type=" + type + ", value=" + value + ", counter=" + counter + "]";
}
}