You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:02:14 UTC
[27/50] [abbrv] kylin git commit: KYLIN-2088 Support intersect count
for calculation of retention or conversion rates
KYLIN-2088 Support intersect count for calculation of retention or conversion rates
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4c970ad
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4c970ad
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4c970ad
Branch: refs/heads/master-hbase1.x
Commit: b4c970adf18362daade77e936693dac08c0639e1
Parents: 61a08d4
Author: sunyerui <su...@gmail.com>
Authored: Wed Oct 12 20:59:54 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Oct 16 08:10:05 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +-
.../org/apache/kylin/measure/MeasureType.java | 5 ++
.../kylin/measure/bitmap/BitmapCounter.java | 32 +++++++
.../BitmapIntersectDistinctCountAggFunc.java | 94 ++++++++++++++++++++
.../kylin/measure/bitmap/BitmapMeasureType.java | 9 ++
.../apache/kylin/query/ITKylinQueryTest.java | 6 ++
.../query/sql_intersect_count/query00.sql | 32 +++++++
.../kylin/query/relnode/OLAPAggregateRel.java | 16 +++-
8 files changed, 195 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 99c3c5a..7dacd06 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -765,7 +765,9 @@ abstract public class KylinConfigBase implements Serializable {
}
public Map<String, String> getUDFs() {
- return getPropertiesByPrefix("kylin.query.udf.");
+ Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf.");
+ udfMap.put("intersect_count", "org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc");
+ return udfMap;
}
public int getHBaseMaxConnectionThreads() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 82618e9..e7312f2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -115,6 +115,11 @@ abstract public class MeasureType<T> {
/** Returns a Calcite aggregation function implementation class */
abstract public Class<?> getRewriteCalciteAggrFunctionClass();
+ /** Some measure may return different class depends on call name, eg. BitmapMeasureType */
+ public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
+ return getRewriteCalciteAggrFunctionClass();
+ }
+
/* ============================================================================
* Storage
* ---------------------------------------------------------------------------- */
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index d3b57a7..827390d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -47,6 +47,12 @@ public class BitmapCounter implements Comparable<BitmapCounter> {
bitmap.clear();
}
+ public BitmapCounter clone() {
+ BitmapCounter newCounter = new BitmapCounter();
+ newCounter.bitmap = bitmap.clone();
+ return newCounter;
+ }
+
public void add(int value) {
bitmap.add(value);
}
@@ -74,6 +80,10 @@ public class BitmapCounter implements Comparable<BitmapCounter> {
this.bitmap.or(another.bitmap);
}
+ public void intersect(BitmapCounter another) {
+ this.bitmap.and(another.bitmap);
+ }
+
public long getCount() {
return this.bitmap.getCardinality();
}
@@ -107,6 +117,28 @@ public class BitmapCounter implements Comparable<BitmapCounter> {
}
@Override
+ public String toString() {
+ long count = getCount();
+ if (count <= 10) {
+ return "(" + count + ")" + bitmap.toString();
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(").append(count).append("){");
+ int values = 0;
+ for (Integer v : bitmap) {
+ if (values++ < 10) {
+ sb.append(v).append(",");
+ } else {
+ sb.append("...");
+ break;
+ }
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+ }
+
+ @Override
public int hashCode() {
final int prime = 31;
int result = 1;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
new file mode 100644
index 0000000..cf42d1b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.measure.bitmap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps
+ * Usage: intersect_count(columnToCount, columnToFilter, filterList)
+ * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
+ * requires an bitmap count distinct measure of uuid, and an dimension of event
+ */
+public class BitmapIntersectDistinctCountAggFunc {
+ private static final Logger logger = LoggerFactory.getLogger(BitmapIntersectDistinctCountAggFunc.class);
+
+ public static class RetentionPartialResult {
+ Map<Object, BitmapCounter> map;
+ List keyList;
+
+ public RetentionPartialResult() {
+ map = new LinkedHashMap<>();
+ }
+
+ public void add(Object key, List keyList, Object value) {
+ if (this.keyList == null) {
+ this.keyList = keyList;
+ }
+ BitmapCounter counter = map.get(key);
+ if (counter == null) {
+ counter = new BitmapCounter();
+ map.put(key, counter);
+ }
+ counter.merge((BitmapCounter)value);
+ }
+
+ public long result() {
+ if (keyList == null || keyList.isEmpty()) {
+ return 0;
+ }
+ BitmapCounter counter = null;
+ for (Object key : keyList) {
+ BitmapCounter c = map.get(key);
+ if (c == null) {
+ // We have a key in filter list but not in map, meaning there's no intersect data
+ return 0;
+ } else {
+ if (counter == null) {
+ counter = c.clone();
+ }
+ counter.intersect(c);
+ }
+ }
+ return counter.getCount();
+ }
+ }
+
+ public static RetentionPartialResult init() {
+ return new RetentionPartialResult();
+ }
+
+ public static RetentionPartialResult add(RetentionPartialResult result, Object value, Object key, List keyList) {
+ result.add(key, keyList, value);
+ return result;
+ }
+
+ public static RetentionPartialResult merge(RetentionPartialResult result, Object value, Object key, List keyList) {
+ return add(result, value, key, keyList);
+ }
+
+ public static long result(RetentionPartialResult result) {
+ return result.result();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index be96eb5..2b88e21 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -39,6 +39,7 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class BitmapMeasureType extends MeasureType<BitmapCounter> {
public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+ public static final String FUNC_INTERSECT_COUNT_DISTINCT = "INTERSECT_COUNT";
public static final String DATATYPE_BITMAP = "bitmap";
public static class Factory extends MeasureTypeFactory<BitmapCounter> {
@@ -160,6 +161,14 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
return BitmapDistinctCountAggFunc.class;
}
+ @Override
+ public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
+ if (callName != null && callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) {
+ return BitmapIntersectDistinctCountAggFunc.class;
+ }
+ return BitmapDistinctCountAggFunc.class;
+ }
+
// In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
private boolean needDictionaryColumn(FunctionDesc functionDesc) {
DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 59a3a04..a0706ca 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -355,4 +355,10 @@ public class ITKylinQueryTest extends KylinTestBase {
this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window");
}
+ @Test
+ public void testIntersectCountQuery() throws Exception {
+ // cannot compare coz H2 does not support intersect count yet..
+ this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_intersect_count");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
new file mode 100644
index 0000000..15e274a
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql
@@ -0,0 +1,32 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select
+week_beg_dt as week,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC']) as a,
+intersect_count( seller_id, lstg_format_name, array['Auction']) as b,
+intersect_count( seller_id, lstg_format_name, array['Others']) as c,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction']) as ab,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Others']) as ac,
+intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction', 'Others']) as abc,
+count(distinct seller_id) as sellers,
+count(*) as cnt
+from test_kylin_fact left join edw.test_cal_dt on test_kylin_fact.cal_dt = edw.test_cal_dt.CAL_DT
+where week_beg_dt in (DATE '2013-12-22', DATE '2012-06-23')
+group by week_beg_dt
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 97efb27..8ecb808 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -56,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.bitmap.BitmapMeasureType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -79,6 +80,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
AGGR_FUNC_MAP.put("$SUM0", "SUM");
AGGR_FUNC_MAP.put("COUNT", "COUNT");
AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
+ AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, "COUNT_DISTINCT");
AGGR_FUNC_MAP.put("MAX", "MAX");
AGGR_FUNC_MAP.put("MIN", "MIN");
@@ -224,6 +226,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
Set<TblColRef> columns = inputColumnRowType.getSourceColumnsByIndex(i);
this.groups.addAll(columns);
}
+ // Some UDAF may group data by itself, add group key into groups, prevents aggregate at cube storage server side
+ for (AggregateCall aggCall : this.rewriteAggCalls) {
+ String aggregateName = aggCall.getAggregation().getName();
+ if (aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT)) {
+ int index = aggCall.getArgList().get(1);
+ TblColRef column = inputColumnRowType.getColumnByIndex(index);
+ groups.add(column);
+ }
+ }
}
private void buildAggregations() {
@@ -380,16 +391,17 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
// rebuild function
+ String callName = aggCall.getAggregation().getName();
RelDataType fieldType = aggCall.getType();
SqlAggFunction newAgg = aggCall.getAggregation();
if (func.isCount()) {
newAgg = SqlStdOperatorTable.SUM0;
} else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) {
- newAgg = createCustomAggFunction(func.getExpression(), fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass());
+ newAgg = createCustomAggFunction(callName, fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName));
}
// rebuild aggregate call
- AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, newAgg.getName());
+ AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
return newAggCall;
}