You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 06:59:51 UTC
[01/21] kylin git commit: KYLIN-2508,
Trans the time to UTC time when set the range of building cube
[Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2501 498aa2562 -> d045a045a (forced update)
KYLIN-2508,Trans the time to UTC time when set the range of building cube
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3e6992da
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3e6992da
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3e6992da
Branch: refs/heads/KYLIN-2501
Commit: 3e6992da3b844d4642985f732c3269e354fed88c
Parents: 5155994
Author: luguosheng <55...@qq.com>
Authored: Wed Mar 15 17:21:39 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 15 18:35:50 2017 +0800
----------------------------------------------------------------------
webapp/app/js/directives/directives.js | 77 +++++++++++++++--------------
1 file changed, 39 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3e6992da/webapp/app/js/directives/directives.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/directives/directives.js b/webapp/app/js/directives/directives.js
index ca08493..bca3b03 100644
--- a/webapp/app/js/directives/directives.js
+++ b/webapp/app/js/directives/directives.js
@@ -249,49 +249,50 @@ KylinApp.directive('kylinPagination', function ($parse, $q) {
}
};
}).directive('dateTimepickerTimezone', function () {
- // this directive workaround to convert GMT0 timestamp to GMT date for datepicker
- return {
- restrict: 'A',
- priority: 1,
- require: 'ngModel',
- link: function (scope, element, attrs, ctrl) {
- ctrl.$formatters.push(function (value) {
-
- //set null for 0
- if(value===0){
- return '';
- }
-
- //return value;
- var newDate = new Date(value + (60000 * new Date().getTimezoneOffset()));
-
- var year = newDate.getFullYear();
- var month = (newDate.getMonth()+1)<10?'0'+(newDate.getMonth()+1):(newDate.getMonth()+1);
- var date = newDate.getDate()<10?'0'+newDate.getDate():newDate.getDate();
+ return {
+ restrict: 'A',
+ priority: 1,
+ require: 'ngModel',
+ link: function (scope, element, attrs, ctrl) {
+ ctrl.$formatters.push(function (value) {
- var hour = newDate.getHours()<10?'0'+newDate.getHours():newDate.getHours();
- var mins = newDate.getMinutes()<10?'0'+newDate.getMinutes():newDate.getMinutes();
- var seconds = newDate.getSeconds()<10?'0'+newDate.getSeconds():newDate.getSeconds();
+ //set null for 0
+ if(value===0){
+ return '';
+ }
- var viewVal = year+"-"+month+"-"+date+" "+hour+":"+mins+":"+seconds;
- return viewVal;
- });
+ //return value;
+ var newDate = new Date(value);
+ var year = newDate.getUTCFullYear();
+ var month = (newDate.getUTCMonth()+1)<10?'0'+(newDate.getUTCMonth()+1):(newDate.getUTCMonth()+1);
+ var date = newDate.getUTCDate()<10?'0'+newDate.getUTCDate():newDate.getUTCDate();
+ var hour = newDate.getUTCHours()<10?'0'+newDate.getUTCHours():newDate.getUTCHours();
+ var mins = newDate.getUTCMinutes()<10?'0'+newDate.getUTCMinutes():newDate.getUTCMinutes();
+ var seconds = newDate.getUTCSeconds()<10?'0'+newDate.getUTCSeconds():newDate.getUTCSeconds();
+ var viewVal = year+"-"+month+"-"+date+" "+hour+":"+mins+":"+seconds;
+ return viewVal;
+ });
- ctrl.$parsers.push(function (value) {
- var date;
- if(/^\d{4}-\d{1,2}-\d{1,2}(\s+\d{1,2}:\d{1,2}:\d{1,2})?$/.test(value)) {
- date=new Date(value);
- if(!date||date&&!date.getTime()){
- return value;
+ ctrl.$parsers.push(function (value) {
+ var date;
+ if(/^\d{4}-\d{1,2}-\d{1,2}(\s+\d{1,2}:\d{1,2}:\d{1,2})?$/.test(value)) {
+ date=new Date(value);
+ if(!date||date&&!date.getTime()){
+ return value;
+ }else{
+ var dateSplit=value.replace(/^\s+|\s+$/,'').replace(/\s+/,'-').split(/[:-]/);
+ var resultDate=[];
+ for(var i=0;i<6;i++){
+ resultDate[i]=dateSplit[i]||0;
+ }
+ return Date.UTC(resultDate[0],resultDate[1]-1,resultDate[2],resultDate[3],resultDate[4],resultDate[5]);
+ }
}else{
- return date.getTime()-(60000 * date.getTimezoneOffset());
+ return value;
}
- }else{
- return value;
- }
- });
- }
- };
+ });
+ }
+ };
}).directive("parametertree", function($compile) {
return {
restrict: "E",
[07/21] kylin git commit: KYLIN-2280 minor,
reset to init before modify ports
Posted by li...@apache.org.
KYLIN-2280 minor,reset to init before modify ports
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/40249fee
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/40249fee
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/40249fee
Branch: refs/heads/KYLIN-2501
Commit: 40249fee78a8a078d461ea66009111082732d27b
Parents: 6c800d6
Author: xiefan46 <95...@qq.com>
Authored: Fri Mar 17 11:27:08 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 12:53:11 2017 +0800
----------------------------------------------------------------------
build/bin/kylin_port_replace_util.sh | 15 ++++++++++-----
build/script/download-tomcat.sh | 1 +
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/40249fee/build/bin/kylin_port_replace_util.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin_port_replace_util.sh b/build/bin/kylin_port_replace_util.sh
index a51e60d..366d8d0 100755
--- a/build/bin/kylin_port_replace_util.sh
+++ b/build/bin/kylin_port_replace_util.sh
@@ -31,6 +31,7 @@ else
fi
#variables
+TOMCAT_INIT_FILE="${KYLIN_HOME}/tomcat/conf/server.xml.init"
TOMCAT_BACKUP_FILE="${KYLIN_HOME}/tomcat/conf/server.xml.backup"
TOMCAT_CONFIG_FILE="${KYLIN_HOME}/tomcat/conf/server.xml"
KYLIN_CONFIG_FILE="${KYLIN_HOME}/conf/kylin.properties"
@@ -49,6 +50,15 @@ then
exit 1
fi
+
+ #backup tomccat file
+ if [ ! -f ${TOMCAT_BACKUP_FILE} ]; then
+ cp -f ${TOMCAT_CONFIG_FILE} ${TOMCAT_BACKUP_FILE}
+ fi
+
+ #force reset
+ cp -f ${TOMCAT_INIT_FILE} ${TOMCAT_CONFIG_FILE} #reset if exist
+
#back or reset
if [ ! -f ${KYLIN_BACKUP_FILE} ]; then #backup if not exist
cp -f ${KYLIN_CONFIG_FILE} ${KYLIN_BACKUP_FILE}
@@ -56,11 +66,6 @@ then
cp -r ${KYLIN_BACKUP_FILE} ${KYLIN_CONFIG_FILE} #reset if exist
fi
- if [ ! -f ${TOMCAT_BACKUP_FILE} ]; then #backup if not exist
- cp -f ${TOMCAT_CONFIG_FILE} ${TOMCAT_BACKUP_FILE}
- else
- cp -r ${TOMCAT_BACKUP_FILE} ${TOMCAT_CONFIG_FILE} #reset if exist
- fi
#replace ports in kylin.properties
new_kylin_port=`expr ${KYLIN_DEFAULT_PORT} + ${OFFSET}`
http://git-wip-us.apache.org/repos/asf/kylin/blob/40249fee/build/script/download-tomcat.sh
----------------------------------------------------------------------
diff --git a/build/script/download-tomcat.sh b/build/script/download-tomcat.sh
index 964400f..b0ff137 100755
--- a/build/script/download-tomcat.sh
+++ b/build/script/download-tomcat.sh
@@ -51,6 +51,7 @@ rm -rf build/tomcat/webapps/*
mv build/tomcat/conf/server.xml build/tomcat/conf/server.xml.bak
mv build/tomcat/conf/context.xml build/tomcat/conf/context.xml.bak
cp build/deploy/server.xml build/tomcat/conf/server.xml
+cp build/deploy/server.xml build/tomcat/conf/server.xml.init
echo "server.xml overwritten..."
cp build/deploy/context.xml build/tomcat/conf/context.xml
echo "context.xml overwritten..."
[02/21] kylin git commit: KYLIN-2344 remove spark-defaults.conf
Posted by li...@apache.org.
KYLIN-2344 remove spark-defaults.conf
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/32034fc1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/32034fc1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/32034fc1
Branch: refs/heads/KYLIN-2501
Commit: 32034fc1d0cb96156da8c4b49c35f2631a59079b
Parents: 3e6992d
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 20:17:01 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 20:17:19 2017 +0800
----------------------------------------------------------------------
build/deploy/spark-defaults.conf | 5 -----
build/script/download-spark.sh | 2 --
2 files changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/32034fc1/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
deleted file mode 100644
index 78a4bc9..0000000
--- a/build/deploy/spark-defaults.conf
+++ /dev/null
@@ -1,5 +0,0 @@
-spark.yarn.submit.file.replication=1
-spark.yarn.max.executor.failures=3
-spark.driver.extraJavaOptions=-Dhdp.version=current
-spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-spark.executor.extraJavaOptions=-Dhdp.version=current
http://git-wip-us.apache.org/repos/asf/kylin/blob/32034fc1/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index 3ea7d08..b73331e 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -53,5 +53,3 @@ rm -rf build/spark/lib/spark-examples-*
rm -rf build/spark/examples
rm -rf build/spark/data
rm -rf build/spark/R
-
-cp build/deploy/spark-defaults.conf build/spark/conf/spark-defaults.conf
\ No newline at end of file
[17/21] kylin git commit: #KYLIN-490 support multiple column distinct
count
Posted by li...@apache.org.
#KYLIN-490 support multiple column distinct count
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/636282db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/636282db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/636282db
Branch: refs/heads/KYLIN-2501
Commit: 636282db889973fe29269b43e417414effb68b76
Parents: f72a3f6
Author: Roger Shi <ro...@hotmail.com>
Authored: Wed Mar 22 19:22:22 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:36:07 2017 +0800
----------------------------------------------------------------------
.../BitmapIntersectDistinctCountAggFunc.java | 9 +-
.../measure/percentile/PercentileAggFunc.java | 9 +-
.../kylin/metadata/model/FunctionDesc.java | 62 ++++++---
.../kylin/metadata/model/ParameterDesc.java | 135 +++++++++++++++++--
.../kylin/query/relnode/OLAPAggregateRel.java | 86 +++++++++---
5 files changed, 250 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index cd4d306..a1e2665 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -17,6 +17,8 @@
*/
package org.apache.kylin.measure.bitmap;
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -27,9 +29,14 @@ import java.util.Map;
* Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
* requires an bitmap count distinct measure of uuid, and an dimension of event
*/
-public class BitmapIntersectDistinctCountAggFunc {
+public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount {
private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
+ @Override
+ public int getParamAsMeasureCount() {
+ return -2;
+ }
+
public static class RetentionPartialResult {
Map<Object, BitmapCounter> map;
List keyList;
http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
index ad02019..d3cec8f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
@@ -18,7 +18,9 @@
package org.apache.kylin.measure.percentile;
-public class PercentileAggFunc {
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
+public class PercentileAggFunc implements ParamAsMeasureCount{
public static PercentileCounter init() {
return null;
}
@@ -41,4 +43,9 @@ public class PercentileAggFunc {
public static double result(PercentileCounter counter) {
return counter == null ? 0L : counter.getResultEstimate();
}
+
+ @Override
+ public int getParamAsMeasureCount() {
+ return 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index cbd7574..61c5fac 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,22 +18,26 @@
package org.apache.kylin.metadata.model;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
/**
*/
@@ -48,7 +52,7 @@ public class FunctionDesc implements Serializable {
r.returnDataType = DataType.getType(returnType);
return r;
}
-
+
public static final String FUNC_SUM = "SUM";
public static final String FUNC_MIN = "MIN";
public static final String FUNC_MAX = "MAX";
@@ -95,7 +99,7 @@ public class FunctionDesc implements Serializable {
}
}
- if(parameter != null)
+ if (parameter != null)
parameter.setColRefs(colRefs);
}
@@ -140,6 +144,8 @@ public class FunctionDesc implements Serializable {
return getParameter().getValue();
} else if (isCount()) {
return "_KY_" + "COUNT__"; // ignores parameter, count(*), count(1), count(col) are all the same
+ } else if (isCountDistinct()) {
+ return "_KY_" + getFullExpressionInAlphabetOrder().replaceAll("[(),. ]", "_");
} else {
return "_KY_" + getFullExpression().replaceAll("[(),. ]", "_");
}
@@ -197,6 +203,25 @@ public class FunctionDesc implements Serializable {
return sb.toString();
}
+ /**
+ * Parameters' name appears in alphabet order.
+ * This method is used for funcs whose parameters appear in arbitrary order
+ */
+ public String getFullExpressionInAlphabetOrder() {
+ StringBuilder sb = new StringBuilder(expression);
+ sb.append("(");
+ ParameterDesc localParam = parameter;
+ List<String> flatParams = Lists.newArrayList();
+ while (localParam != null) {
+ flatParams.add(localParam.getValue());
+ localParam = localParam.getNextParameter();
+ }
+ Collections.sort(flatParams);
+ sb.append(Joiner.on(",").join(flatParams));
+ sb.append(")");
+ return sb.toString();
+ }
+
public boolean isDimensionAsMetric() {
return isDimensionAsMetric;
}
@@ -264,13 +289,20 @@ public class FunctionDesc implements Serializable {
return false;
} else if (!expression.equals(other.expression))
return false;
- // NOTE: don't check the parameter of count()
- if (isCount() == false) {
+ if (isCountDistinct()) {
+ // for count distinct func, param's order doesn't matter
+ if (parameter == null) {
+ if (other.parameter != null)
+ return false;
+ } else {
+ return parameter.equalInArbitraryOrder(other.parameter);
+ }
+ } else if (!isCount()) { // NOTE: don't check the parameter of count()
if (parameter == null) {
if (other.parameter != null)
return false;
} else {
- if (!parameter.equals(other.parameter))
+ if (!parameter.equals(other.parameter))
return false;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 8ad20a8..5ba2f14 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,17 +18,19 @@
package org.apache.kylin.metadata.model;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Sets;
/**
*/
@@ -38,9 +40,9 @@ public class ParameterDesc implements Serializable {
public static ParameterDesc newInstance(Object... objs) {
if (objs.length == 0)
throw new IllegalArgumentException();
-
+
ParameterDesc r = new ParameterDesc();
-
+
Object obj = objs[0];
if (obj instanceof TblColRef) {
TblColRef col = (TblColRef) obj;
@@ -51,7 +53,7 @@ public class ParameterDesc implements Serializable {
r.type = FunctionDesc.PARAMETER_TYPE_CONSTANT;
r.value = (String) obj;
}
-
+
if (objs.length >= 2) {
r.nextParameter = newInstance(Arrays.copyOfRange(objs, 1, objs.length));
if (r.nextParameter.colRefs.size() > 0) {
@@ -63,7 +65,7 @@ public class ParameterDesc implements Serializable {
}
return r;
}
-
+
@JsonProperty("type")
private String type;
@JsonProperty("value")
@@ -74,6 +76,15 @@ public class ParameterDesc implements Serializable {
private ParameterDesc nextParameter;
private List<TblColRef> colRefs = ImmutableList.of();
+ private Set<PlainParameter> plainParameters = null;
+
+ // Lazy evaluation
+ public Set<PlainParameter> getPlainParameters() {
+ if (plainParameters == null) {
+ plainParameters = PlainParameter.createFromParameterDesc(this);
+ }
+ return plainParameters;
+ }
public String getType() {
return type;
@@ -86,7 +97,7 @@ public class ParameterDesc implements Serializable {
public String getValue() {
return value;
}
-
+
void setValue(String value) {
this.value = value;
}
@@ -94,7 +105,7 @@ public class ParameterDesc implements Serializable {
public List<TblColRef> getColRefs() {
return colRefs;
}
-
+
void setColRefs(List<TblColRef> colRefs) {
this.colRefs = colRefs;
}
@@ -118,7 +129,7 @@ public class ParameterDesc implements Serializable {
if (type != null ? !type.equals(that.type) : that.type != null)
return false;
-
+
ParameterDesc p = this, q = that;
int refi = 0, refj = 0;
for (; p != null && q != null; p = p.nextParameter, q = q.nextParameter) {
@@ -138,10 +149,24 @@ public class ParameterDesc implements Serializable {
return false;
}
}
-
+
return p == null && q == null;
}
+ public boolean equalInArbitraryOrder(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ParameterDesc that = (ParameterDesc) o;
+
+ Set<PlainParameter> thisPlainParams = this.getPlainParameters();
+ Set<PlainParameter> thatPlainParams = that.getPlainParameters();
+
+ return thisPlainParams.containsAll(thatPlainParams) && thatPlainParams.containsAll(thisPlainParams);
+ }
+
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
@@ -154,4 +179,88 @@ public class ParameterDesc implements Serializable {
return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
}
+ /**
+ * PlainParameter is created to present ParameterDesc in List style.
+ * Compared to ParameterDesc its advantage is:
+ * 1. easy to compare without considering order
+ * 2. easy to compare one by one
+ */
+ private static class PlainParameter {
+ private String type;
+ private String value;
+ private TblColRef colRef = null;
+
+ private PlainParameter() {
+ }
+
+ public boolean isColumnType() {
+ return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
+ }
+
+ static Set<PlainParameter> createFromParameterDesc(ParameterDesc parameterDesc) {
+ Set<PlainParameter> result = Sets.newHashSet();
+ ParameterDesc local = parameterDesc;
+ List<TblColRef> totalColRef = parameterDesc.colRefs;
+ Integer colIndex = 0;
+ while (local != null) {
+ if (local.isColumnType()) {
+ result.add(createSingleColumnParameter(local, totalColRef.get(colIndex++)));
+ } else {
+ result.add(createSingleValueParameter(local));
+ }
+ local = local.nextParameter;
+ }
+ return result;
+ }
+
+ static PlainParameter createSingleValueParameter(ParameterDesc parameterDesc) {
+ PlainParameter single = new PlainParameter();
+ single.type = parameterDesc.type;
+ single.value = parameterDesc.value;
+ return single;
+ }
+
+ static PlainParameter createSingleColumnParameter(ParameterDesc parameterDesc, TblColRef colRef) {
+ PlainParameter single = new PlainParameter();
+ single.type = parameterDesc.type;
+ single.value = parameterDesc.value;
+ single.colRef = colRef;
+ return single;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + (colRef != null ? colRef.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PlainParameter that = (PlainParameter) o;
+
+ if (type != null ? !type.equals(that.type) : that.type != null)
+ return false;
+
+ if (this.isColumnType()) {
+ if (!that.isColumnType())
+ return false;
+ if (!this.colRef.equals(that.colRef)) {
+ return false;
+ }
+ } else {
+ if (that.isColumnType())
+ return false;
+ if (!this.value.equals(that.value))
+ return false;
+ }
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8d7c597..2c75a14 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -18,6 +18,7 @@
package org.apache.kylin.query.relnode;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.ParamAsMeasureCount;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
@@ -71,6 +73,7 @@ import com.google.common.collect.Sets;
public class OLAPAggregateRel extends Aggregate implements OLAPRel {
private final static Map<String, String> AGGR_FUNC_MAP = new HashMap<String, String>();
+ private final static Map<String, Integer> AGGR_FUNC_PARAM_AS_MEASTURE_MAP = new HashMap<String, Integer>();
static {
AGGR_FUNC_MAP.put("SUM", "SUM");
@@ -84,6 +87,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
for (String udaf : udafFactories.keySet()) {
AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
}
+
+ Map<String, Class<?>> udafs = MeasureTypeFactory.getUDAFs();
+ for (String func : udafs.keySet()) {
+ try {
+ AGGR_FUNC_PARAM_AS_MEASTURE_MAP.put(func, ((ParamAsMeasureCount) (udafs.get(func).newInstance())).getParamAsMeasureCount());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
private static String getSqlFuncName(AggregateCall aggCall) {
@@ -235,12 +247,27 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
this.aggregations = new ArrayList<FunctionDesc>();
for (AggregateCall aggCall : this.rewriteAggCalls) {
ParameterDesc parameter = null;
+ // By default all args are included, UDFs can define their own in getParamAsMeasureCount method.
if (!aggCall.getArgList().isEmpty()) {
- // TODO: Currently only get the column of first param
- int index = aggCall.getArgList().get(0);
- TblColRef column = inputColumnRowType.getColumnByIndex(index);
- if (!column.isInnerColumn()) {
- parameter = ParameterDesc.newInstance(column);
+ List<TblColRef> columns = Lists.newArrayList();
+ String funcName = getSqlFuncName(aggCall);
+ int columnsCount = aggCall.getArgList().size();
+ if (AGGR_FUNC_PARAM_AS_MEASTURE_MAP.containsKey(funcName)) {
+ int asMeasureCnt = AGGR_FUNC_PARAM_AS_MEASTURE_MAP.get(funcName);
+ if (asMeasureCnt > 0) {
+ columnsCount = asMeasureCnt;
+ } else {
+ columnsCount += asMeasureCnt;
+ }
+ }
+ for (Integer index : aggCall.getArgList().subList(0, columnsCount)) {
+ TblColRef column = inputColumnRowType.getColumnByIndex(index);
+ if (!column.isInnerColumn()) {
+ columns.add(column);
+ }
+ }
+ if (!columns.isEmpty()) {
+ parameter = ParameterDesc.newInstance(columns.toArray(new TblColRef[columns.size()]));
}
}
String expression = getAggrFuncName(aggCall);
@@ -341,10 +368,11 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
AggregateCall aggCall = this.rewriteAggCalls.get(i);
if (!aggCall.getArgList().isEmpty()) {
- int index = aggCall.getArgList().get(0);
- TblColRef column = inputColumnRowType.getColumnByIndex(index);
- if (!column.isInnerColumn()) {
- this.context.metricsColumns.add(column);
+ for (Integer index : aggCall.getArgList()) {
+ TblColRef column = inputColumnRowType.getColumnByIndex(index);
+ if (!column.isInnerColumn()) {
+ this.context.metricsColumns.add(column);
+ }
}
}
}
@@ -385,18 +413,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
return aggCall;
}
- // rebuild parameters
- List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
- if (func.needRewriteField()) {
- RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
- if (newArgList.isEmpty()) {
- newArgList.add(field.getIndex());
- } else {
- // only the first column got overwritten
- newArgList.set(0, field.getIndex());
- }
- }
-
// rebuild function
String callName = getSqlFuncName(aggCall);
RelDataType fieldType = aggCall.getType();
@@ -408,12 +424,40 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
}
+ // rebuild parameters
+ List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
+ if (udafMap != null && udafMap.containsKey(callName)) {
+ newArgList = truncArgList(newArgList, udafMap.get(callName));
+ }
+ if (func.needRewriteField()) {
+ RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
+ if (newArgList.isEmpty()) {
+ newArgList.add(field.getIndex());
+ } else {
+ // TODO: only the first column got overwritten
+ newArgList.set(0, field.getIndex());
+ }
+ }
+
// rebuild aggregate call
@SuppressWarnings("deprecation")
AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
return newAggCall;
}
+ /**
+ * truncate Arg List according to UDAF's "add" method parameter count
+ */
+ private List<Integer> truncArgList(List<Integer> argList, Class<?> udafClazz) {
+ int argListLength = argList.size();
+ for (Method method : udafClazz.getMethods()) {
+ if (method.getName().equals("add")) {
+ argListLength = Math.min(method.getParameterTypes().length - 1, argListLength);
+ }
+ }
+ return argList.subList(0, argListLength);
+ }
+
private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));
[18/21] kylin git commit: Merge commit
'88a1c71dde855c693b230f67b92c4cd067d43b2b'
Posted by li...@apache.org.
Merge commit '88a1c71dde855c693b230f67b92c4cd067d43b2b'
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1cc0dd6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1cc0dd6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1cc0dd6
Branch: refs/heads/KYLIN-2501
Commit: b1cc0dd643ebd55776a3f22b61ce33e2abf29d48
Parents: 636282d 88a1c71
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 27 15:39:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:39:03 2017 +0800
----------------------------------------------------------------------
.../kylin/measure/ParamAsMeasureCount.java | 30 ++++++++++++++++++++
.../resources/query/sql_distinct/query08.sql | 24 ++++++++++++++++
2 files changed, 54 insertions(+)
----------------------------------------------------------------------
[10/21] kylin git commit: minor fix unused import
Posted by li...@apache.org.
minor fix unused import
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/19c87e7a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/19c87e7a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/19c87e7a
Branch: refs/heads/KYLIN-2501
Commit: 19c87e7a80b95a16a930feb6d0281555cd1232b5
Parents: 8b70fa5
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Mar 17 23:33:42 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 23:39:22 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/metadata/model/JoinsTree.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/19c87e7a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index 224788c..c6df52e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -30,11 +30,10 @@ import java.util.Queue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-public class JoinsTree implements Serializable {
+public class JoinsTree implements Serializable {
private static final long serialVersionUID = 1L;
-
+
final Map<String, Chain> tableChains = new LinkedHashMap<>();
public JoinsTree(TableRef rootTable, List<JoinDesc> joins) {
@@ -48,7 +47,7 @@ public class JoinsTree implements Serializable {
// Walk through joins to build FK table to joins mapping
HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
int joinCount = 0;
- for (JoinDesc join: joins) {
+ for (JoinDesc join : joins) {
joinCount++;
String fkSideAlias = join.getFKSide().getAlias();
if (fkJoinMap.containsKey(fkSideAlias)) {
@@ -64,14 +63,14 @@ public class JoinsTree implements Serializable {
chainBuff.add(new Chain(rootTable, null, null));
int chainCount = 0;
while (!chainBuff.isEmpty()) {
- Chain chain= chainBuff.poll();
+ Chain chain = chainBuff.poll();
String pkSideAlias = chain.table.getAlias();
chainCount++;
tableChains.put(pkSideAlias, chain);
// this round pk side is next round's fk side
if (fkJoinMap.containsKey(pkSideAlias)) {
- for (JoinDesc join: fkJoinMap.get(pkSideAlias)) {
+ for (JoinDesc join : fkJoinMap.get(pkSideAlias)) {
chainBuff.add(new Chain(join.getPKSide(), join, chain));
}
}
@@ -149,7 +148,7 @@ public class JoinsTree implements Serializable {
public static class Chain implements Serializable {
private static final long serialVersionUID = 1L;
-
+
TableRef table; // pk side
JoinDesc join;
Chain fkSide;
[16/21] kylin git commit: #KYLIN-490 support multiple column distinct
count
Posted by li...@apache.org.
#KYLIN-490 support multiple column distinct count
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/88a1c71d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/88a1c71d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/88a1c71d
Branch: refs/heads/KYLIN-2501
Commit: 88a1c71dde855c693b230f67b92c4cd067d43b2b
Parents: f72a3f6
Author: Roger Shi <ro...@hotmail.com>
Authored: Wed Mar 22 19:22:22 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:20:40 2017 +0800
----------------------------------------------------------------------
.../kylin/measure/ParamAsMeasureCount.java | 30 +++++
.../BitmapIntersectDistinctCountAggFunc.java | 9 +-
.../measure/percentile/PercentileAggFunc.java | 9 +-
.../kylin/metadata/model/FunctionDesc.java | 62 ++++++---
.../kylin/metadata/model/ParameterDesc.java | 135 +++++++++++++++++--
.../resources/query/sql_distinct/query08.sql | 24 ++++
.../kylin/query/relnode/OLAPAggregateRel.java | 86 +++++++++---
7 files changed, 304 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java b/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
new file mode 100644
index 0000000..b9bcd10
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+public interface ParamAsMeasureCount {
+ /**
+ * Get how many parameters are required to identify the measure
+ * Negative value is for var arguments function
+ * @return 0 ==> all parameters
+ * positive number ==> parameter count
+ * negative number ==> parameter count - required number
+ */
+ int getParamAsMeasureCount();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index cd4d306..a1e2665 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -17,6 +17,8 @@
*/
package org.apache.kylin.measure.bitmap;
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -27,9 +29,14 @@ import java.util.Map;
* Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
* requires an bitmap count distinct measure of uuid, and an dimension of event
*/
-public class BitmapIntersectDistinctCountAggFunc {
+public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount {
private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
+ @Override
+ public int getParamAsMeasureCount() {
+ return -2;
+ }
+
public static class RetentionPartialResult {
Map<Object, BitmapCounter> map;
List keyList;
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
index ad02019..d3cec8f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
@@ -18,7 +18,9 @@
package org.apache.kylin.measure.percentile;
-public class PercentileAggFunc {
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
+public class PercentileAggFunc implements ParamAsMeasureCount{
public static PercentileCounter init() {
return null;
}
@@ -41,4 +43,9 @@ public class PercentileAggFunc {
public static double result(PercentileCounter counter) {
return counter == null ? 0L : counter.getResultEstimate();
}
+
+ @Override
+ public int getParamAsMeasureCount() {
+ return 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index cbd7574..61c5fac 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,22 +18,26 @@
package org.apache.kylin.metadata.model;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
/**
*/
@@ -48,7 +52,7 @@ public class FunctionDesc implements Serializable {
r.returnDataType = DataType.getType(returnType);
return r;
}
-
+
public static final String FUNC_SUM = "SUM";
public static final String FUNC_MIN = "MIN";
public static final String FUNC_MAX = "MAX";
@@ -95,7 +99,7 @@ public class FunctionDesc implements Serializable {
}
}
- if(parameter != null)
+ if (parameter != null)
parameter.setColRefs(colRefs);
}
@@ -140,6 +144,8 @@ public class FunctionDesc implements Serializable {
return getParameter().getValue();
} else if (isCount()) {
return "_KY_" + "COUNT__"; // ignores parameter, count(*), count(1), count(col) are all the same
+ } else if (isCountDistinct()) {
+ return "_KY_" + getFullExpressionInAlphabetOrder().replaceAll("[(),. ]", "_");
} else {
return "_KY_" + getFullExpression().replaceAll("[(),. ]", "_");
}
@@ -197,6 +203,25 @@ public class FunctionDesc implements Serializable {
return sb.toString();
}
+ /**
+ * Parameters' name appears in alphabet order.
+ * This method is used for funcs whose parameters appear in arbitrary order
+ */
+ public String getFullExpressionInAlphabetOrder() {
+ StringBuilder sb = new StringBuilder(expression);
+ sb.append("(");
+ ParameterDesc localParam = parameter;
+ List<String> flatParams = Lists.newArrayList();
+ while (localParam != null) {
+ flatParams.add(localParam.getValue());
+ localParam = localParam.getNextParameter();
+ }
+ Collections.sort(flatParams);
+ sb.append(Joiner.on(",").join(flatParams));
+ sb.append(")");
+ return sb.toString();
+ }
+
public boolean isDimensionAsMetric() {
return isDimensionAsMetric;
}
@@ -264,13 +289,20 @@ public class FunctionDesc implements Serializable {
return false;
} else if (!expression.equals(other.expression))
return false;
- // NOTE: don't check the parameter of count()
- if (isCount() == false) {
+ if (isCountDistinct()) {
+ // for count distinct func, param's order doesn't matter
+ if (parameter == null) {
+ if (other.parameter != null)
+ return false;
+ } else {
+ return parameter.equalInArbitraryOrder(other.parameter);
+ }
+ } else if (!isCount()) { // NOTE: don't check the parameter of count()
if (parameter == null) {
if (other.parameter != null)
return false;
} else {
- if (!parameter.equals(other.parameter))
+ if (!parameter.equals(other.parameter))
return false;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 8ad20a8..5ba2f14 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,17 +18,19 @@
package org.apache.kylin.metadata.model;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Sets;
/**
*/
@@ -38,9 +40,9 @@ public class ParameterDesc implements Serializable {
public static ParameterDesc newInstance(Object... objs) {
if (objs.length == 0)
throw new IllegalArgumentException();
-
+
ParameterDesc r = new ParameterDesc();
-
+
Object obj = objs[0];
if (obj instanceof TblColRef) {
TblColRef col = (TblColRef) obj;
@@ -51,7 +53,7 @@ public class ParameterDesc implements Serializable {
r.type = FunctionDesc.PARAMETER_TYPE_CONSTANT;
r.value = (String) obj;
}
-
+
if (objs.length >= 2) {
r.nextParameter = newInstance(Arrays.copyOfRange(objs, 1, objs.length));
if (r.nextParameter.colRefs.size() > 0) {
@@ -63,7 +65,7 @@ public class ParameterDesc implements Serializable {
}
return r;
}
-
+
@JsonProperty("type")
private String type;
@JsonProperty("value")
@@ -74,6 +76,15 @@ public class ParameterDesc implements Serializable {
private ParameterDesc nextParameter;
private List<TblColRef> colRefs = ImmutableList.of();
+ private Set<PlainParameter> plainParameters = null;
+
+ // Lazy evaluation
+ public Set<PlainParameter> getPlainParameters() {
+ if (plainParameters == null) {
+ plainParameters = PlainParameter.createFromParameterDesc(this);
+ }
+ return plainParameters;
+ }
public String getType() {
return type;
@@ -86,7 +97,7 @@ public class ParameterDesc implements Serializable {
public String getValue() {
return value;
}
-
+
void setValue(String value) {
this.value = value;
}
@@ -94,7 +105,7 @@ public class ParameterDesc implements Serializable {
public List<TblColRef> getColRefs() {
return colRefs;
}
-
+
void setColRefs(List<TblColRef> colRefs) {
this.colRefs = colRefs;
}
@@ -118,7 +129,7 @@ public class ParameterDesc implements Serializable {
if (type != null ? !type.equals(that.type) : that.type != null)
return false;
-
+
ParameterDesc p = this, q = that;
int refi = 0, refj = 0;
for (; p != null && q != null; p = p.nextParameter, q = q.nextParameter) {
@@ -138,10 +149,24 @@ public class ParameterDesc implements Serializable {
return false;
}
}
-
+
return p == null && q == null;
}
+ public boolean equalInArbitraryOrder(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ParameterDesc that = (ParameterDesc) o;
+
+ Set<PlainParameter> thisPlainParams = this.getPlainParameters();
+ Set<PlainParameter> thatPlainParams = that.getPlainParameters();
+
+ return thisPlainParams.containsAll(thatPlainParams) && thatPlainParams.containsAll(thisPlainParams);
+ }
+
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
@@ -154,4 +179,88 @@ public class ParameterDesc implements Serializable {
return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
}
+ /**
+ * PlainParameter is created to present ParameterDesc in List style.
+ * Compared to ParameterDesc its advantage is:
+ * 1. easy to compare without considering order
+ * 2. easy to compare one by one
+ */
+ private static class PlainParameter {
+ private String type;
+ private String value;
+ private TblColRef colRef = null;
+
+ private PlainParameter() {
+ }
+
+ public boolean isColumnType() {
+ return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
+ }
+
+ static Set<PlainParameter> createFromParameterDesc(ParameterDesc parameterDesc) {
+ Set<PlainParameter> result = Sets.newHashSet();
+ ParameterDesc local = parameterDesc;
+ List<TblColRef> totalColRef = parameterDesc.colRefs;
+ Integer colIndex = 0;
+ while (local != null) {
+ if (local.isColumnType()) {
+ result.add(createSingleColumnParameter(local, totalColRef.get(colIndex++)));
+ } else {
+ result.add(createSingleValueParameter(local));
+ }
+ local = local.nextParameter;
+ }
+ return result;
+ }
+
+ static PlainParameter createSingleValueParameter(ParameterDesc parameterDesc) {
+ PlainParameter single = new PlainParameter();
+ single.type = parameterDesc.type;
+ single.value = parameterDesc.value;
+ return single;
+ }
+
+ static PlainParameter createSingleColumnParameter(ParameterDesc parameterDesc, TblColRef colRef) {
+ PlainParameter single = new PlainParameter();
+ single.type = parameterDesc.type;
+ single.value = parameterDesc.value;
+ single.colRef = colRef;
+ return single;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + (colRef != null ? colRef.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PlainParameter that = (PlainParameter) o;
+
+ if (type != null ? !type.equals(that.type) : that.type != null)
+ return false;
+
+ if (this.isColumnType()) {
+ if (!that.isColumnType())
+ return false;
+ if (!this.colRef.equals(that.colRef)) {
+ return false;
+ }
+ } else {
+ if (that.isColumnType())
+ return false;
+ if (!this.value.equals(that.value))
+ return false;
+ }
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/kylin-it/src/test/resources/query/sql_distinct/query08.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_distinct/query08.sql b/kylin-it/src/test/resources/query/sql_distinct/query08.sql
new file mode 100644
index 0000000..60f02e7
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_distinct/query08.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select cal_dt,
+ sum(price) as GMV,
+ count(1) as TRANS_CNT,
+ count(distinct seller_id, lstg_format_name) as DIST_SELLER_FORMAT
+ from test_kylin_fact
+ group by cal_dt
http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8d7c597..2c75a14 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -18,6 +18,7 @@
package org.apache.kylin.query.relnode;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.ParamAsMeasureCount;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
@@ -71,6 +73,7 @@ import com.google.common.collect.Sets;
public class OLAPAggregateRel extends Aggregate implements OLAPRel {
private final static Map<String, String> AGGR_FUNC_MAP = new HashMap<String, String>();
+ private final static Map<String, Integer> AGGR_FUNC_PARAM_AS_MEASTURE_MAP = new HashMap<String, Integer>();
static {
AGGR_FUNC_MAP.put("SUM", "SUM");
@@ -84,6 +87,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
for (String udaf : udafFactories.keySet()) {
AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
}
+
+ Map<String, Class<?>> udafs = MeasureTypeFactory.getUDAFs();
+ for (String func : udafs.keySet()) {
+ try {
+ AGGR_FUNC_PARAM_AS_MEASTURE_MAP.put(func, ((ParamAsMeasureCount) (udafs.get(func).newInstance())).getParamAsMeasureCount());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
private static String getSqlFuncName(AggregateCall aggCall) {
@@ -235,12 +247,27 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
this.aggregations = new ArrayList<FunctionDesc>();
for (AggregateCall aggCall : this.rewriteAggCalls) {
ParameterDesc parameter = null;
+ // By default all args are included, UDFs can define their own in getParamAsMeasureCount method.
if (!aggCall.getArgList().isEmpty()) {
- // TODO: Currently only get the column of first param
- int index = aggCall.getArgList().get(0);
- TblColRef column = inputColumnRowType.getColumnByIndex(index);
- if (!column.isInnerColumn()) {
- parameter = ParameterDesc.newInstance(column);
+ List<TblColRef> columns = Lists.newArrayList();
+ String funcName = getSqlFuncName(aggCall);
+ int columnsCount = aggCall.getArgList().size();
+ if (AGGR_FUNC_PARAM_AS_MEASTURE_MAP.containsKey(funcName)) {
+ int asMeasureCnt = AGGR_FUNC_PARAM_AS_MEASTURE_MAP.get(funcName);
+ if (asMeasureCnt > 0) {
+ columnsCount = asMeasureCnt;
+ } else {
+ columnsCount += asMeasureCnt;
+ }
+ }
+ for (Integer index : aggCall.getArgList().subList(0, columnsCount)) {
+ TblColRef column = inputColumnRowType.getColumnByIndex(index);
+ if (!column.isInnerColumn()) {
+ columns.add(column);
+ }
+ }
+ if (!columns.isEmpty()) {
+ parameter = ParameterDesc.newInstance(columns.toArray(new TblColRef[columns.size()]));
}
}
String expression = getAggrFuncName(aggCall);
@@ -341,10 +368,11 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
AggregateCall aggCall = this.rewriteAggCalls.get(i);
if (!aggCall.getArgList().isEmpty()) {
- int index = aggCall.getArgList().get(0);
- TblColRef column = inputColumnRowType.getColumnByIndex(index);
- if (!column.isInnerColumn()) {
- this.context.metricsColumns.add(column);
+ for (Integer index : aggCall.getArgList()) {
+ TblColRef column = inputColumnRowType.getColumnByIndex(index);
+ if (!column.isInnerColumn()) {
+ this.context.metricsColumns.add(column);
+ }
}
}
}
@@ -385,18 +413,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
return aggCall;
}
- // rebuild parameters
- List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
- if (func.needRewriteField()) {
- RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
- if (newArgList.isEmpty()) {
- newArgList.add(field.getIndex());
- } else {
- // only the first column got overwritten
- newArgList.set(0, field.getIndex());
- }
- }
-
// rebuild function
String callName = getSqlFuncName(aggCall);
RelDataType fieldType = aggCall.getType();
@@ -408,12 +424,40 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
}
+ // rebuild parameters
+ List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
+ if (udafMap != null && udafMap.containsKey(callName)) {
+ newArgList = truncArgList(newArgList, udafMap.get(callName));
+ }
+ if (func.needRewriteField()) {
+ RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
+ if (newArgList.isEmpty()) {
+ newArgList.add(field.getIndex());
+ } else {
+ // TODO: only the first column got overwritten
+ newArgList.set(0, field.getIndex());
+ }
+ }
+
// rebuild aggregate call
@SuppressWarnings("deprecation")
AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
return newAggCall;
}
+ /**
+ * truncate Arg List according to UDAF's "add" method parameter count
+ */
+ private List<Integer> truncArgList(List<Integer> argList, Class<?> udafClazz) {
+ int argListLength = argList.size();
+ for (Method method : udafClazz.getMethods()) {
+ if (method.getName().equals("add")) {
+ argListLength = Math.min(method.getParameterTypes().length - 1, argListLength);
+ }
+ }
+ return argList.subList(0, argListLength);
+ }
+
private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));
[15/21] kylin git commit: minor change on kylin pom
Posted by li...@apache.org.
minor change on kylin pom
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f72a3f6d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f72a3f6d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f72a3f6d
Branch: refs/heads/KYLIN-2501
Commit: f72a3f6d35c028ebb5c68ee6bd11b97750fbf194
Parents: c82603a
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 20 20:08:43 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 20 20:08:43 2017 +0800
----------------------------------------------------------------------
pom.xml | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f72a3f6d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dec04bc..0e41dcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -806,8 +806,7 @@
<dependencies>
<!-- the logging dependencies are inherited by all modules for their generality
- log4j and slf4j-log4j12 test scope only for UT/IT use
- -->
+ log4j and slf4j-log4j12 test scope only for UT/IT use -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
[12/21] kylin git commit: KYLIN-2489 upgrade zookeeper to 3.4.8
Posted by li...@apache.org.
KYLIN-2489 upgrade zookeeper to 3.4.8
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/98664f0d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/98664f0d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/98664f0d
Branch: refs/heads/KYLIN-2501
Commit: 98664f0decb0591ccc44692193145a1187201dfd
Parents: 0860048
Author: Li Yang <li...@apache.org>
Authored: Sun Mar 19 06:52:39 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Mar 19 06:52:39 2017 +0800
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/98664f0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 30506e5..9a56dde 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
<avatica.version>1.9.0</avatica.version>
<!-- Hadoop Common deps, keep compatible with hadoop2.version -->
- <zookeeper.version>3.4.6</zookeeper.version>
+ <zookeeper.version>3.4.8</zookeeper.version>
<curator.version>2.7.1</curator.version>
<jsr305.version>3.0.1</jsr305.version>
<guava.version>14.0</guava.version>
[21/21] kylin git commit: KYLIN-2501 bugfix, pass IT
Posted by li...@apache.org.
KYLIN-2501 bugfix, pass IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d045a045
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d045a045
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d045a045
Branch: refs/heads/KYLIN-2501
Commit: d045a045a51f587afb35a997a87e249ad1da4adb
Parents: 7e3c423
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 22 16:31:45 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 14:59:51 2017 +0800
----------------------------------------------------------------------
.../gridtable/GTStreamAggregateScanner.java | 24 +++--
.../apache/kylin/storage/StorageContext.java | 12 ---
.../gtrecord/GTCubeStorageQueryBase.java | 7 --
.../storage/gtrecord/PartitionResultMerger.java | 100 -------------------
.../gtrecord/SegmentCubeTupleIterator.java | 7 +-
.../SortMergedPartitionResultIterator.java | 81 +++++++++++++++
.../gtrecord/StorageResponseGTScatter.java | 13 ++-
7 files changed, 108 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 1fde423..4eb5791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -18,6 +18,7 @@
package org.apache.kylin.gridtable;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.kylin.GTForwardingScanner;
@@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private final GTScanRequest req;
private final Comparator<GTRecord> keyComparator;
- public GTStreamAggregateScanner(IGTScanner delegated,
- GTScanRequest req, Comparator<GTRecord> keyComparator) {
+ public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) {
super(delegated);
- this.req = req;
- this.keyComparator = keyComparator;
+ this.req = Preconditions.checkNotNull(scanRequest, "scanRequest");
+ this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy());
}
@Override
@@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
private int[] gtDimsIdx;
- private int[] gtMetricsIdx;
+ private int[] gtMetricsIdx; // specify which metric to return and their order
+ private int[] aggIdx; // specify the ith returning metric's aggStates index
+
private Object[] result; // avoid object creation
StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
super(input);
this.gtDimsIdx = gtDimsIdx;
this.gtMetricsIdx = gtMetricsIdx;
- result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+ this.aggIdx = new int[gtMetricsIdx.length];
+ for (int i = 0; i < aggIdx.length; i++) {
+ int metricIdx = gtMetricsIdx[i];
+ aggIdx[i] = metrics.trueBitIndexOf(metricIdx);
+ }
+
+ this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
}
private void decodeAndSetDimensions(GTRecord record) {
@@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
decodeAndSetDimensions(record);
// set metrics
- for (int i = 0; i < gtMetricsIdx.length; i++) {
- result[gtDimsIdx.length + i] = aggStates[i];
+ for (int i = 0; i < aggIdx.length; i++) {
+ result[gtDimsIdx.length + i] = aggStates[aggIdx[i]];
}
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index bb17054..4522261 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,12 +18,10 @@
package org.apache.kylin.storage;
-import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
@@ -49,9 +47,7 @@ public class StorageContext {
private boolean exactAggregation = false;
private boolean needStorageAggregation = false;
private boolean enableCoprocessor = false;
-
private boolean enableStreamAggregate = false;
- private Comparator<GTRecord> groupKeyComparator;
private IStorageQuery storageQuery;
private AtomicLong processedRowCount = new AtomicLong();
@@ -242,12 +238,4 @@ public class StorageContext {
public void enableStreamAggregate() {
this.enableStreamAggregate = true;
}
-
- public Comparator<GTRecord> getGroupKeyComparator() {
- return groupKeyComparator;
- }
-
- public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
- this.groupKeyComparator = groupKeyComparator;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 82590a2..d91a0b4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,18 +26,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (enabled) {
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
- ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
-
context.enableStreamAggregate();
- context.setGroupKeyComparator(GTRecord.getComparator(cols));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
deleted file mode 100644
index 52029d3..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gtrecord;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-
-/**
- * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
- */
-public class PartitionResultMerger implements Iterable<GTRecord> {
- private final ImmutableList<PartitionResultIterator> partitionResults;
- private final GTInfo info;
- private final Comparator<GTRecord> comparator;
-
- public PartitionResultMerger(
- Iterable<PartitionResultIterator> partitionResults,
- GTInfo info, Comparator<GTRecord> comparator) {
- this.partitionResults = ImmutableList.copyOf(partitionResults);
- this.info = info;
- this.comparator = comparator;
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- if (partitionResults.size() == 1) {
- return partitionResults.get(0);
- }
- return new MergingResultsIterator();
- }
-
- private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
- final GTRecord record = new GTRecord(info); // reuse to avoid object creation
-
- PriorityQueue<PeekingIterator<GTRecord>> heap;
-
- MergingResultsIterator() {
- Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
- public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
- return comparator.compare(o1.peek(), o2.peek());
- }
- };
- this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
-
- for (PartitionResultIterator it : partitionResults) {
- if (it.hasNext()) {
- heap.offer(Iterators.peekingIterator(it));
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return !heap.isEmpty();
- }
-
- @Override
- public GTRecord next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- // get smallest record
- PeekingIterator<GTRecord> it = heap.poll();
- // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
- // so we must make a shallow copy of it.
- record.shallowCopyFrom(it.next());
-
- if (it.hasNext()) {
- heap.offer(it);
- }
-
- return record;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 11f766c..3bac5ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
final Iterator<GTRecord> records, final GTScanRequest scanRequest,
final int[] gtDimsIdx, final int[] gtMetricsIdx) {
- boolean singlePartitionResult = records instanceof PartitionResultIterator;
- if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+ boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator;
+ if (hasMultiplePartitions && context.isStreamAggregateEnabled()) {
// input records are ordered, leverage stream aggregator to produce possibly fewer records
IGTScanner inputScanner = new IGTScanner() {
public GTInfo getInfo() {
@@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
return records;
}
};
- GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
- inputScanner, scanRequest, context.getGroupKeyComparator());
+ GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest);
return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
new file mode 100644
index 0000000..21e61e3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+
+ final GTRecord record ; // reuse to avoid object creation
+ PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+ SortMergedPartitionResultIterator(
+ List<PartitionResultIterator> partitionResults,
+ GTInfo info, final Comparator<GTRecord> comparator) {
+
+ this.record = new GTRecord(info);
+ Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+ public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ };
+ this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+ for (PartitionResultIterator it : partitionResults) {
+ if (it.hasNext()) {
+ heap.offer(Iterators.peekingIterator(it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // get smallest record
+ PeekingIterator<GTRecord> it = heap.poll();
+ // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+ // so we must make a shallow copy of it.
+ record.shallowCopyFrom(it.next());
+
+ if (it.hasNext()) {
+ heap.offer(it);
+ }
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 0f1e191..f1ab20c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner {
private IPartitionStreamer partitionStreamer;
private final Iterator<byte[]> blocks;
private final ImmutableBitSet columns;
- private final StorageContext context;
+ private final ImmutableBitSet groupByDims;
private final boolean needSorted; // whether scanner should return sorted records
public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
@@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner {
this.partitionStreamer = partitionStreamer;
this.blocks = partitionStreamer.asByteArrayIterator();
this.columns = scanRequest.getColumns();
- this.context = context;
+ this.groupByDims = scanRequest.getAggrGroupBy();
this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
}
@@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner {
partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
}
+ if (partitionResults.size() == 1) {
+ return partitionResults.get(0);
+ }
+
if (!needSorted) {
logger.debug("Using Iterators.concat to merge partition results");
return Iterators.concat(partitionResults.iterator());
}
- logger.debug("Using PartitionResultMerger to merge partition results");
- PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
- return merger.iterator();
+ logger.debug("Using SortMergedPartitionResultIterator to merge partition results");
+ return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
}
}
[08/21] kylin git commit: KYLIN-2514 handle disordered joins in data
model
Posted by li...@apache.org.
KYLIN-2514 handle disordered joins in data model
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/14b96a86
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/14b96a86
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/14b96a86
Branch: refs/heads/KYLIN-2501
Commit: 14b96a86dc4cf6da78f2137838ca9827848f52be
Parents: 40249fe
Author: Roger Shi <ro...@hotmail.com>
Authored: Fri Mar 17 19:27:18 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 19:55:27 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/metadata/model/JoinsTree.java | 43 +++++++++++++++++---
.../model_desc/ci_left_join_model.json | 28 ++++++-------
2 files changed, 52 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/14b96a86/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index c7666cb..3c876a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -24,8 +24,12 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
public class JoinsTree implements Serializable {
private static final long serialVersionUID = 1L;
@@ -40,12 +44,41 @@ public class JoinsTree implements Serializable {
Preconditions.checkState(col.isQualified());
}
- tableChains.put(rootTable.getAlias(), new Chain(rootTable, null, null));
+ // Walk through joins to build FK table to joins mapping
+ HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
+ int joinCount = 0;
+ for (JoinDesc join: joins) {
+ joinCount++;
+ String fkSideAlias = join.getFKSide().getAlias();
+ if (fkJoinMap.containsKey(fkSideAlias)) {
+ fkJoinMap.get(fkSideAlias).add(join);
+ } else {
+ List<JoinDesc> joinDescList = Lists.newArrayList(join);
+ fkJoinMap.put(fkSideAlias, joinDescList);
+ }
+ }
- for (JoinDesc join : joins) {
- TableRef pkSide = join.getPKSide();
- Chain fkSide = tableChains.get(join.getFKSide().getAlias());
- tableChains.put(pkSide.getAlias(), new Chain(pkSide, join, fkSide));
+ // Width-first build tree (tableChains)
+ Queue<Chain> chainBuff = Queues.newArrayDeque();
+ chainBuff.add(new Chain(rootTable, null, null));
+ int chainCount = 0;
+ while (!chainBuff.isEmpty()) {
+ Chain chain= chainBuff.poll();
+ String pkSideAlias = chain.table.getAlias();
+ chainCount++;
+ tableChains.put(pkSideAlias, chain);
+
+ // this round pk side is next round's fk side
+ if (fkJoinMap.containsKey(pkSideAlias)) {
+ for (JoinDesc join: fkJoinMap.get(pkSideAlias)) {
+ chainBuff.add(new Chain(join.getPKSide(), join, chain));
+ }
+ }
+ }
+
+ // if join count not match (chain count - 1), there must be some join not take effect
+ if (joinCount != (chainCount - 1)) {
+ throw new IllegalArgumentException("There's some illegal Joins, please check your model");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/14b96a86/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json b/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
index 1b08aaf..bc5b444 100644
--- a/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
+++ b/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
@@ -31,20 +31,6 @@
}
},
{
- "table": "DEFAULT.TEST_ACCOUNT",
- "alias": "SELLER_ACCOUNT",
- "kind": "FACT",
- "join": {
- "type": "LEFT",
- "primary_key": [
- "SELLER_ACCOUNT.ACCOUNT_ID"
- ],
- "foreign_key": [
- "TEST_KYLIN_FACT.SELLER_ID"
- ]
- }
- },
- {
"table": "EDW.TEST_CAL_DT",
"join": {
"type": "LEFT",
@@ -119,6 +105,20 @@
"SELLER_ACCOUNT.ACCOUNT_COUNTRY"
]
}
+ },
+ {
+ "table": "DEFAULT.TEST_ACCOUNT",
+ "alias": "SELLER_ACCOUNT",
+ "kind": "FACT",
+ "join": {
+ "type": "LEFT",
+ "primary_key": [
+ "SELLER_ACCOUNT.ACCOUNT_ID"
+ ],
+ "foreign_key": [
+ "TEST_KYLIN_FACT.SELLER_ID"
+ ]
+ }
}
],
"dimensions": [
[20/21] kylin git commit: KYLIN-2501 Stream Aggregate GTRecords at
Query Server
Posted by li...@apache.org.
KYLIN-2501 Stream Aggregate GTRecords at Query Server
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7e3c4234
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7e3c4234
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7e3c4234
Branch: refs/heads/KYLIN-2501
Commit: 7e3c4234e57f0f36c1c1829d7f97d67f8eeaa77b
Parents: 4c21821
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 15 22:45:02 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 14:59:51 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/common/util/ImmutableBitSet.java | 29 ++-
.../org/apache/kylin/GTForwardingScanner.java | 56 +++++
.../kylin/cube/gridtable/CubeGridTable.java | 18 --
.../gridtable/CuboidToGridTableMapping.java | 18 ++
.../cube/inmemcubing/InMemCubeBuilder.java | 6 +-
.../kylin/gridtable/GTAggregateScanner.java | 16 +-
.../apache/kylin/gridtable/GTFilterScanner.java | 22 +-
.../org/apache/kylin/gridtable/GTRecord.java | 80 +++----
.../apache/kylin/gridtable/GTScanRequest.java | 13 ++
.../gridtable/GTStreamAggregateScanner.java | 211 +++++++++++++++++++
.../kylin/gridtable/GTScanReqSerDerTest.java | 4 +-
.../apache/kylin/storage/StorageContext.java | 20 ++
.../storage/gtrecord/CubeScanRangePlanner.java | 3 +-
.../storage/gtrecord/CubeSegmentScanner.java | 7 +-
.../storage/gtrecord/CubeTupleConverter.java | 31 +--
.../gtrecord/GTCubeStorageQueryBase.java | 38 +++-
.../kylin/storage/gtrecord/ITupleConverter.java | 3 +-
.../gtrecord/PartitionResultIterator.java | 59 ++++++
.../storage/gtrecord/PartitionResultMerger.java | 100 +++++++++
.../kylin/storage/gtrecord/ScannerWorker.java | 5 +-
.../gtrecord/SegmentCubeTupleIterator.java | 72 ++++++-
.../gtrecord/StorageResponseGTScatter.java | 82 +++----
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 5 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 5 +-
26 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 02349ad..9cd35c8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true"));
}
+ public boolean isStreamAggregateEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
+ }
+
@Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
public int getStoragePushDownLimitMax() {
return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index b417877..5cdf08c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -19,8 +19,9 @@ package org.apache.kylin.common.util;
import java.nio.ByteBuffer;
import java.util.BitSet;
+import java.util.Iterator;
-public class ImmutableBitSet {
+public class ImmutableBitSet implements Iterable<Integer> {
public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
@@ -168,4 +169,30 @@ public class ImmutableBitSet {
return new ImmutableBitSet(bitSet);
}
};
+
+ /**
+ * Iterate over the positions of true value.
+ * @return the iterator
+ */
+ @Override
+ public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < arr.length;
+ }
+
+ @Override
+ public Integer next() {
+ return arr[index++];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
new file mode 100644
index 0000000..de8c88d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link IGTScanner} which forwards all its method calls to another scanner.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ */
+public class GTForwardingScanner implements IGTScanner {
+ protected IGTScanner delegated;
+
+ protected GTForwardingScanner(IGTScanner delegated) {
+ this.delegated = checkNotNull(delegated, "delegated");
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return delegated.getInfo();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegated.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return delegated.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 563cf43..5cee9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -18,29 +18,11 @@
package org.apache.kylin.cube.gridtable;
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.IDimensionEncodingMap;
import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
public class CubeGridTable {
-
- public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
- Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
- return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg));
- }
-
- public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
- return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap));
- }
-
public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) {
CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 2e5dd12..6879687 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -140,11 +140,29 @@ public class CuboidToGridTableMapping {
return i == null ? -1 : i.intValue();
}
+ public int[] getDimIndexes(Collection<TblColRef> dims) {
+ int[] result = new int[dims.size()];
+ int i = 0;
+ for (TblColRef dim : dims) {
+ result[i++] = getIndexOf(dim);
+ }
+ return result;
+ }
+
public int getIndexOf(FunctionDesc metric) {
Integer r = metrics2gt.get(metric);
return r == null ? -1 : r;
}
+ public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) {
+ int[] result = new int[metrics.size()];
+ int i = 0;
+ for (FunctionDesc metric : metrics) {
+ result[i++] = getIndexOf(metric);
+ }
+ return result;
+ }
+
public List<TblColRef> getCuboidDimensionsInGTOrder() {
return cuboid.getColumns();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e08844e..a26e948 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
@@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
- GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+ GTInfo info = CubeGridTable.newGTInfo(
+ Cuboid.findById(cubeDesc, cuboidID),
+ new CubeDimEncMap(cubeDesc, dictionaryMap)
+ );
// Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
// MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7cdd4f5..0dd6fa9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.metadata.datatype.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner {
final ImmutableBitSet metrics;
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
+ final BufferedMeasureCodec measureCodec;
final AggregationCache aggrCache;
final long spillThreshold; // 0 means no memory control && no spill
final int storagePushDownLimit;//default to be Int.MAX
@@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner {
this.metrics = req.getAggrMetrics();
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
+ this.measureCodec = req.createMeasureCodec();
this.aggrCache = new AggregationCache();
this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
this.aggrMask = new boolean[metricsAggrFuncs.length];
@@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner {
final int keyLength;
final boolean[] compareMask;
boolean compareAll = true;
- final BufferedMeasureCodec measureCodec;
final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
@Override
@@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner {
keyLength = compareMask.length;
dumps = Lists.newArrayList();
aggBufMap = createBuffMap();
- measureCodec = createMeasureCodec();
- }
-
- private BufferedMeasureCodec createMeasureCodec() {
- DataType[] types = new DataType[metrics.trueBitCount()];
- for (int i = 0; i < types.length; i++) {
- types[i] = info.getColumnType(metrics.trueBitAt(i));
- }
-
- BufferedMeasureCodec result = new BufferedMeasureCodec(types);
- result.setBufferSize(info.getMaxColumnLength(metrics));
- return result;
}
private boolean[] createCompareMask() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 717f89c..cad0a04 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.kylin.GTForwardingScanner;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-public class GTFilterScanner implements IGTScanner {
+public class GTFilterScanner extends GTForwardingScanner {
- final private IGTScanner inputScanner;
final private TupleFilter filter;
final private IFilterCodeSystem<ByteArray> filterCodeSystem;
final private IEvaluatableTuple oneTuple; // avoid instance creation
private GTRecord next = null;
- public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
- this.inputScanner = inputScanner;
+ public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException {
+ super(delegated);
this.filter = req.getFilterPushDown();
this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
this.oneTuple = new IEvaluatableTuple() {
@@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner {
}
};
- if (TupleFilter.isEvaluableRecursively(filter) == false)
+ if (!TupleFilter.isEvaluableRecursively(filter))
throw new IllegalArgumentException();
}
@Override
- public GTInfo getInfo() {
- return inputScanner.getInfo();
- }
-
- @Override
- public void close() throws IOException {
- inputScanner.close();
- }
-
- @Override
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
- private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+ private Iterator<GTRecord> inputIterator = delegated.iterator();
private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter);
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index f4480c8..3397adc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -21,7 +21,6 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.List;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -46,18 +45,21 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
}
this.info = info;
}
-
- public GTRecord(GTRecord other) {
- this.info = other.info;
- this.cols = new ByteArray[info.getColumnCount()];
- for (int i = 0; i < other.cols.length; i++) {
- this.cols[i] = other.cols[i].copy();
+
+ @Override
+ public GTRecord clone() { // deep copy
+ ByteArray[] cols = new ByteArray[this.cols.length];
+ for (int i = 0; i < cols.length; i++) {
+ cols[i] = this.cols[i].copy();
}
+ return new GTRecord(this.info, cols);
}
- @Override
- public Object clone() {
- return new GTRecord(this);
+ public void shallowCopyFrom(GTRecord source) {
+ assert info == source.info;
+ for (int i = 0; i < cols.length; i++) {
+ cols[i].set(source.cols[i]);
+ }
}
public GTInfo getInfo() {
@@ -106,30 +108,18 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
/** decode and return the values of this record */
public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
assert selectedCols.cardinality() == result.length;
-
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- if (cols[c] == null || cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
+ result[i] = decodeValue(selectedCols.trueBitAt(i));
}
return result;
}
- /** decode and return the values of this record */
- public Object[] getValues(int[] selectedColumns, Object[] result) {
- assert selectedColumns.length <= result.length;
- for (int i = 0; i < selectedColumns.length; i++) {
- int c = selectedColumns[i];
- if (cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
+ public Object decodeValue(int c) {
+ ByteArray col = cols[c];
+ if (col != null && col.array() != null) {
+ return info.codeSystem.decodeColumnValue(c, col.asBuffer());
}
- return result;
+ return null;
}
public int sizeOf(ImmutableBitSet selectedCols) {
@@ -198,19 +188,13 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
return compareToInternal(o, info.colAll);
}
- public int compareToOnPrimaryKey(GTRecord o) {
- return compareToInternal(o, info.primaryKey);
- }
-
- public static Comparator<GTRecord> getPrimaryKeyComparator() {
+ public static Comparator<GTRecord> getComparator(final ImmutableBitSet participateCols) {
return new Comparator<GTRecord>() {
- @Override
public int compare(GTRecord o1, GTRecord o2) {
if (o1 == null || o2 == null) {
throw new IllegalStateException("Cannot handle null");
}
-
- return o1.compareToOnPrimaryKey(o2);
+ return o1.compareToInternal(o2, participateCols);
}
};
}
@@ -287,26 +271,14 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
loadColumns(info.colBlocks[c], buf);
}
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
- int pos = buf.position();
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- int len = info.codeSystem.codeLength(c, buf);
- cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
- pos += len;
- buf.position(pos);
- }
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize
- * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
- * method allows to defined specific columns(in order) to load
+ /**
+ * Change pointers to point to data in given buffer, UNLIKE deserialize
+ * @param selectedCols positions of column to load
+ * @param buf buffer containing continuous data of selected columns
*/
- public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+ public void loadColumns(Iterable<Integer> selectedCols, ByteBuffer buf) {
int pos = buf.position();
- for (int i = 0; i < selectedCols.size(); i++) {
- int c = selectedCols.get(i);
+ for (int c : selectedCols) {
int len = info.codeSystem.codeLength(c, buf);
cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
pos += len;
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4629c8e..ae35d2b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.SerializeToByteBuffer;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -202,6 +204,17 @@ public class GTScanRequest {
}
+ public BufferedMeasureCodec createMeasureCodec() {
+ DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()];
+ for (int i = 0; i < metricTypes.length; i++) {
+ metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i));
+ }
+
+ BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes);
+ codec.setBufferSize(info.getMaxColumnLength(aggrMetrics));
+ return codec;
+ }
+
public boolean isDoingStorageAggregation() {
return doingStorageAggregation;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
new file mode 100644
index 0000000..1fde423
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.kylin.GTForwardingScanner;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * GTStreamAggregateScanner requires input records to be sorted on group fields.
+ * In such cases, it's superior to hash/sort based aggregator because it can produce
+ * ordered outputs on the fly and the memory consumption is very low.
+ */
+public class GTStreamAggregateScanner extends GTForwardingScanner {
+ private final GTScanRequest req;
+ private final Comparator<GTRecord> keyComparator;
+
+ public GTStreamAggregateScanner(IGTScanner delegated,
+ GTScanRequest req, Comparator<GTRecord> keyComparator) {
+ super(delegated);
+ this.req = req;
+ this.keyComparator = keyComparator;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new StreamMergeGTRecordIterator(delegated.iterator());
+ }
+
+ public Iterator<Object[]> valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) {
+ return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx);
+ }
+
+ private abstract class AbstractStreamMergeIterator<E> implements Iterator<E> {
+ final PeekingIterator<GTRecord> input;
+ final IGTCodeSystem codeSystem;
+ final ImmutableBitSet dimensions;
+ final ImmutableBitSet metrics;
+ final String[] metricFuncs;
+ final BufferedMeasureCodec measureCodec;
+
+ private final GTRecord first; // reuse to avoid object creation
+
+ AbstractStreamMergeIterator(Iterator<GTRecord> input) {
+ this.input = Iterators.peekingIterator(input);
+ this.codeSystem = req.getInfo().getCodeSystem();
+ this.dimensions = req.getDimensions();
+ this.metrics = req.getAggrMetrics();
+ this.metricFuncs = req.getAggrMetricsFuncs();
+ this.measureCodec = req.createMeasureCodec();
+
+ this.first = new GTRecord(req.getInfo());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ private boolean isSameKey(GTRecord o1, GTRecord o2) {
+ return keyComparator.compare(o1, o2) == 0;
+ }
+
+ private boolean shouldMergeNext(GTRecord current) {
+ return input.hasNext() && isSameKey(current, input.peek());
+ }
+
+ protected abstract E finalizeResult(GTRecord record);
+
+ protected abstract E finalizeResult(GTRecord record, Object[] aggStates);
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // WATCH OUT! record returned by "input" scanner could be changed later,
+ // so we must make a shallow copy of it.
+ first.shallowCopyFrom(input.next());
+
+ // shortcut to avoid extra deserialize/serialize cost
+ if (!shouldMergeNext(first)) {
+ return finalizeResult(first);
+ }
+ // merge records with the same key
+ MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs);
+ aggregate(aggrs, first);
+ aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later
+ while (shouldMergeNext(first)) {
+ aggregate(aggrs, input.next());
+ }
+
+ Object[] aggStates = new Object[aggrs.length];
+ for (int i = 0; i < aggStates.length; i++) {
+ aggStates[i] = aggrs[i].getState();
+ }
+ return finalizeResult(first, aggStates);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) {
+ for (int i = 0; i < aggregators.length; i++) {
+ int c = metrics.trueBitAt(i);
+ Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer());
+ aggregators[i].aggregate(metric);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ }
+
+ private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator<GTRecord> {
+
+ private GTRecord returnRecord; // avoid object creation
+
+ StreamMergeGTRecordIterator(Iterator<GTRecord> input) {
+ super(input);
+ this.returnRecord = new GTRecord(req.getInfo());
+ }
+
+ @Override
+ protected GTRecord finalizeResult(GTRecord record) {
+ return record;
+ }
+
+ @Override
+ protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) {
+ // 1. load dimensions
+ for (int c : dimensions) {
+ returnRecord.cols[c] = record.cols[c];
+ }
+ // 2. serialize metrics
+ byte[] bytes = measureCodec.encode(aggStates).array();
+ int[] sizes = measureCodec.getMeasureSizes();
+ // 3. load metrics
+ int offset = 0;
+ for (int i = 0; i < metrics.trueBitCount(); i++) {
+ int c = metrics.trueBitAt(i);
+ returnRecord.cols[c].set(bytes, offset, sizes[i]);
+ offset += sizes[i];
+ }
+ return returnRecord;
+ }
+ }
+
+ private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
+
+ private int[] gtDimsIdx;
+ private int[] gtMetricsIdx;
+ private Object[] result; // avoid object creation
+
+ StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
+ super(input);
+ this.gtDimsIdx = gtDimsIdx;
+ this.gtMetricsIdx = gtMetricsIdx;
+ result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+ }
+
+ private void decodeAndSetDimensions(GTRecord record) {
+ for (int i = 0; i < gtDimsIdx.length; i++) {
+ result[i] = record.decodeValue(gtDimsIdx[i]);
+ }
+ }
+
+ @Override
+ protected Object[] finalizeResult(GTRecord record) {
+ decodeAndSetDimensions(record);
+ // decode metrics
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+ }
+ return result;
+ }
+
+ @Override
+ protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
+ decodeAndSetDimensions(record);
+ // set metrics
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = aggStates[i];
+ }
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
index 77cc2d8..1ae229a 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase {
CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready");
CubeSegment segment = cube.getFirstSegment();
- GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor()));
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor());
+ GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment));
GTInfo.serializer.serialize(info, buffer);
buffer.flip();
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 998f1db..bb17054 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,10 +18,12 @@
package org.apache.kylin.storage;
+import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
@@ -48,6 +50,9 @@ public class StorageContext {
private boolean needStorageAggregation = false;
private boolean enableCoprocessor = false;
+ private boolean enableStreamAggregate = false;
+ private Comparator<GTRecord> groupKeyComparator;
+
private IStorageQuery storageQuery;
private AtomicLong processedRowCount = new AtomicLong();
private Cuboid cuboid;
@@ -230,4 +235,19 @@ public class StorageContext {
this.storageQuery = storageQuery;
}
+ public boolean isStreamAggregateEnabled() {
+ return enableStreamAggregate;
+ }
+
+ public void enableStreamAggregate() {
+ this.enableStreamAggregate = true;
+ }
+
+ public Comparator<GTRecord> getGroupKeyComparator() {
+ return groupKeyComparator;
+ }
+
+ public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
+ this.groupKeyComparator = groupKeyComparator;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 6911827..c3cc858 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.RecordComparators;
import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
@@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
Set<TblColRef> filterDims = Sets.newHashSet();
TupleFilter.collectColumns(filter, filterDims);
- this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+ this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment));
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
IGTComparator comp = gtInfo.getCodeSystem().getComparator();
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 4f206d4..31a9f99 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
- scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
}
@Override
@@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner {
return scanRequest == null ? null : scanRequest.getInfo();
}
- public CubeSegment getSegment() {
- return this.cubeSeg;
+ public GTScanRequest getScanRequest() {
+ return scanRequest;
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 280718f..b762e5c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -43,7 +41,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
- * convert GTRecord to tuple
+ * Convert Object[] (decoded GTRecord) to tuple
*/
public class CubeTupleConverter implements ITupleConverter {
@@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter {
private final int[] gtColIdx;
private final int[] tupleIdx;
- private final Object[] gtValues;
private final MeasureType<?>[] measureTypes;
private final List<IAdvMeasureFiller> advMeasureFillers;
@@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter {
private final int nSelectedDims;
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx,
+ TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
+ this.gtColIdx = gtColIdx;
this.tupleInfo = returnTupleInfo;
this.derivedColFillers = Lists.newArrayList();
- List<TblColRef> cuboidDims = cuboid.getColumns();
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
nSelectedDims = selectedDimensions.size();
- gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
- gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
// measure types don't have this many, but aligned length make programming easier
measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
@@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter {
// pre-calculate dimension index mapping to tuple
for (TblColRef dim : selectedDimensions) {
- int dimIndex = mapping.getIndexOf(dim);
- gtColIdx[i] = dimIndex;
tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-
- // if (tupleIdx[iii] == -1) {
- // throw new IllegalStateException("dim not used in tuple:" + dim);
- // }
-
i++;
}
for (FunctionDesc metric : selectedMetrics) {
- int metricIndex = mapping.getIndexOf(metric);
- gtColIdx[i] = metricIndex;
-
if (metric.needRewrite()) {
String rewriteFieldName = metric.getRewriteFieldName();
tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
@@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter {
}
// prepare derived columns and filler
- Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+ Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
TblColRef[] hostCols = entry.getKey().data;
for (DeriveInfo deriveInfo : entry.getValue()) {
@@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter {
}
@Override
- public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
- record.getValues(gtColIdx, gtValues);
+ public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple) {
+ assert gtValues.length == gtColIdx.length;
// dimensions
for (int i = 0; i < nSelectedDims; i++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ecf1ad3..82590a2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,15 +26,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -120,6 +123,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// set limit push down
enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
+ // set whether to aggregate results from multiple partitions
+ enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
// set query deadline
context.setDeadline(cubeInstance);
@@ -144,8 +149,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
protected abstract String getGTStorage();
- protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) {
- return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) {
+ return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
}
protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -366,6 +371,35 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
+ private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) {
+ CubeDesc cubeDesc = cuboid.getCubeDesc();
+ boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled();
+
+ Set<TblColRef> shardByInGroups = Sets.newHashSet();
+ for (TblColRef col : cubeDesc.getShardByColumns()) {
+ if (groupsD.contains(col)) {
+ shardByInGroups.add(col);
+ }
+ }
+ if (!shardByInGroups.isEmpty()) {
+ enabled = false;
+ logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups);
+ }
+
+ if (!context.isNeedStorageAggregation()) {
+ enabled = false;
+ logger.debug("Aggregate partition results is not beneficial because no storage aggregation");
+ }
+
+ if (enabled) {
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+ ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
+
+ context.enableStreamAggregate();
+ context.setGroupKeyComparator(GTRecord.getComparator(cols));
+ }
+ }
+
protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
Map<String, List<MeasureDesc>> map = Maps.newHashMap();
for (MeasureDesc measure : cubeDesc.getMeasures()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index 9c50d0c..dd48e4d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord;
import java.util.List;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.tuple.Tuple;
public interface ITupleConverter {
- public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple);
+ public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
new file mode 100644
index 0000000..474e1e0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+/**
+ * Support iterate over {@code GTRecord}s in storage partition result.
+ *
+ * <p>Note that the implementation returns the same object for next().
+ * Client needs to copy the returned record when needed.
+ */
+public class PartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+ private final ByteBuffer buffer;
+ private final ImmutableBitSet cols;
+ private final GTRecord record; // reuse to avoid object creation
+
+ public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) {
+ this.buffer = ByteBuffer.wrap(data);
+ this.cols = cols;
+ this.record = new GTRecord(info);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return buffer.hasRemaining();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ record.loadColumns(cols, buffer);
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
new file mode 100644
index 0000000..52029d3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class PartitionResultMerger implements Iterable<GTRecord> {
+ private final ImmutableList<PartitionResultIterator> partitionResults;
+ private final GTInfo info;
+ private final Comparator<GTRecord> comparator;
+
+ public PartitionResultMerger(
+ Iterable<PartitionResultIterator> partitionResults,
+ GTInfo info, Comparator<GTRecord> comparator) {
+ this.partitionResults = ImmutableList.copyOf(partitionResults);
+ this.info = info;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ if (partitionResults.size() == 1) {
+ return partitionResults.get(0);
+ }
+ return new MergingResultsIterator();
+ }
+
+ private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
+ final GTRecord record = new GTRecord(info); // reuse to avoid object creation
+
+ PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+ MergingResultsIterator() {
+ Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+ public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ };
+ this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+ for (PartitionResultIterator it : partitionResults) {
+ if (it.hasNext()) {
+ heap.offer(Iterators.peekingIterator(it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // get smallest record
+ PeekingIterator<GTRecord> it = heap.poll();
+ // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+ // so we must make a shallow copy of it.
+ record.shallowCopyFrom(it.next());
+
+ if (it.hasNext()) {
+ heap.offer(it);
+ }
+
+ return record;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index 9e89227..fe22e9c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStorage;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ public class ScannerWorker {
private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
private IGTScanner internal = null;
- public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
+ public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
if (scanRequest == null) {
logger.info("Segment {} will be skipped", segment);
internal = new EmptyGTScanner();
@@ -48,7 +49,7 @@ public class ScannerWorker {
final GTInfo info = scanRequest.getInfo();
try {
- IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
+ IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
internal = rpc.getGTScanner(scanRequest);
} catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 37699a3..11f766c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -24,8 +24,14 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
+import com.google.common.collect.UnmodifiableIterator;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTStreamAggregateScanner;
+import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
protected final Tuple tuple;
protected final StorageContext context;
- protected Iterator<GTRecord> gtItr;
+ protected Iterator<Object[]> gtValues;
protected ITupleConverter cubeTupleConverter;
protected Tuple next;
@@ -66,12 +72,62 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
this.tupleInfo = returnTupleInfo;
this.tuple = new Tuple(returnTupleInfo);
this.context = context;
- this.gtItr = getGTItr(scanner);
- this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+ int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions);
+ int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics);
+ // gtColIdx = gtDimsIdx + gtMetricsIdx
+ int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length];
+ System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length);
+ System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length);
+
+ this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx);
+ this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(
+ scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
}
- private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
- return scanner.iterator();
+ private Iterator<Object[]> getGTValuesIterator(
+ final Iterator<GTRecord> records, final GTScanRequest scanRequest,
+ final int[] gtDimsIdx, final int[] gtMetricsIdx) {
+
+ boolean singlePartitionResult = records instanceof PartitionResultIterator;
+ if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+ // input records are ordered, leverage stream aggregator to produce possibly fewer records
+ IGTScanner inputScanner = new IGTScanner() {
+ public GTInfo getInfo() {
+ return scanRequest.getInfo();
+ }
+
+ public void close() throws IOException {}
+
+ public Iterator<GTRecord> iterator() {
+ return records;
+ }
+ };
+ GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
+ inputScanner, scanRequest, context.getGroupKeyComparator());
+ return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
+ }
+
+ // simply decode records
+ return new UnmodifiableIterator<Object[]>() {
+ Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ public Object[] next() {
+ GTRecord record = records.next();
+ for (int i = 0; i < gtDimsIdx.length; i++) {
+ result[i] = record.decodeValue(gtDimsIdx[i]);
+ }
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+ }
+ return result;
+ }
+ };
}
@Override
@@ -91,13 +147,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
}
// now we have a GTRecord
- if (!gtItr.hasNext()) {
+ if (!gtValues.hasNext()) {
return false;
}
- GTRecord curRecord = gtItr.next();
+ Object[] gtValues = this.gtValues.next();
// translate into tuple
- advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+ advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple);
// the simple case
if (advMeasureFillers == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 1a80bbf..0f1e191 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -18,22 +18,20 @@
package org.apache.kylin.storage.gtrecord;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
/**
* scatter the blob returned from region server to a iterable of gtrecords
@@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner {
private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
- private GTInfo info;
+ private final GTInfo info;
private IPartitionStreamer partitionStreamer;
- private Iterator<byte[]> blocks;
- private ImmutableBitSet columns;
- private int storagePushDownLimit = -1;
+ private final Iterator<byte[]> blocks;
+ private final ImmutableBitSet columns;
+ private final StorageContext context;
+ private final boolean needSorted; // whether scanner should return sorted records
- public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) {
- this.info = info;
+ public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
+ this.info = scanRequest.getInfo();
this.partitionStreamer = partitionStreamer;
this.blocks = partitionStreamer.asByteArrayIterator();
- this.columns = columns;
- this.storagePushDownLimit = storagePushDownLimit;
+ this.columns = scanRequest.getColumns();
+ this.context = context;
+ this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
}
@Override
@@ -69,48 +69,18 @@ public class StorageResponseGTScatter implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
- if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
- logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
- return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
- } else {
- logger.info("Using Iterators.concat to merge partition results");
- return Iterators.concat(shardSubsets);
+ List<PartitionResultIterator> partitionResults = Lists.newArrayList();
+ while (blocks.hasNext()) {
+ partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
}
- }
-
- class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
- @Nullable
- @Override
- public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
- return new Iterator<GTRecord>() {
- private ByteBuffer inputBuffer = null;
- //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
- private GTRecord firstRecord = null;
-
- @Override
- public boolean hasNext() {
- if (inputBuffer == null) {
- inputBuffer = ByteBuffer.wrap(input);
- firstRecord = new GTRecord(info);
- }
- return inputBuffer.position() < inputBuffer.limit();
- }
-
- @Override
- public GTRecord next() {
- firstRecord.loadColumns(columns, inputBuffer);
- return firstRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ if (!needSorted) {
+ logger.debug("Using Iterators.concat to merge partition results");
+ return Iterators.concat(partitionResults.iterator());
}
- }
+ logger.debug("Using PartitionResultMerger to merge partition results");
+ PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
+ return merger.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 82b67b6..e822ada 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
- public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
private byte[] getByteArrayForShort(short v) {
@@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
});
}
- return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit());
+ return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
}
private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 88e7176..db81646 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.IGTStorage;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage {
final protected Cuboid cuboid;
final protected GTInfo fullGTInfo;
final protected QueryContext queryContext;
+ final protected StorageContext storageContext;
final private RowKeyEncoder fuzzyKeyEncoder;
final private RowKeyEncoder fuzzyMaskEncoder;
- public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+ public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
this.queryContext = QueryContext.current();
+ this.storageContext = context;
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 33f8d90..951e2ef 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
}
- public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
@Override
[04/21] kylin git commit: KYLIN-2510 Fix unintended NPE in
CubeMetaExtractor
Posted by li...@apache.org.
KYLIN-2510 Fix unintended NPE in CubeMetaExtractor
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8be842e9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8be842e9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8be842e9
Branch: refs/heads/KYLIN-2501
Commit: 8be842e9b845477541c2569ae1c2484e9d627214
Parents: dde297e
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 09:44:47 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 09:44:54 2017 +0800
----------------------------------------------------------------------
tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8be842e9/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index e370e48..188524d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -65,6 +65,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -172,6 +173,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
String projectNames = optionsHelper.getOptionValue(OPTION_PROJECT);
for (String projectName : projectNames.split(",")) {
ProjectInstance projectInstance = projectManager.getProject(projectName);
+ Preconditions.checkNotNull(projectInstance, "Project " + projectName + " does not exist.");
requireProject(projectInstance);
}
} else if (optionsHelper.hasOption(OPTION_CUBE)) {
@@ -202,9 +204,6 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
}
private void requireProject(ProjectInstance projectInstance) throws IOException {
- if (projectInstance == null) {
- throw new IllegalArgumentException("Project " + projectInstance.getName() + " does not exist");
- }
addRequired(projectInstance.getResourcePath());
List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
for (RealizationEntry realizationEntry : realizationEntries) {
[19/21] kylin git commit: KYLIN-2518 Optimize put row key to hll
Posted by li...@apache.org.
KYLIN-2518 Optimize put row key to hll
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c218214
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c218214
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c218214
Branch: refs/heads/KYLIN-2501
Commit: 4c21821471cb261cfecdf8289c5f8284af817b3e
Parents: b1cc0dd
Author: xiefan46 <95...@qq.com>
Authored: Mon Mar 27 18:13:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 29 11:01:24 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/measure/hllc/HLLCounter.java | 54 ++--
.../mr/steps/FactDistinctColumnsMapper.java | 31 +-
.../mr/steps/NewCubeSamplingMethodTest.java | 299 +++++++++++++++++++
3 files changed, 341 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index 82c881b..b793465 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -60,7 +60,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
merge(another);
}
- HLLCounter(int p, RegisterType type) {
+ public HLLCounter(int p, RegisterType type) {
this(p, type, Hashing.murmur3_128());
}
@@ -99,6 +99,10 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
add(hashFunc.hashBytes(value, offset, length).asLong());
}
+ public void addHashDirectly(long hash){
+ add(hash);
+ }
+
protected void add(long hash) {
int bucketMask = m - 1;
int bucket = (int) (hash & bucketMask);
@@ -126,7 +130,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
}
private void toDenseIfNeeded() {
- if (register instanceof SparseRegister) {
+ if (register.getRegisterType() == RegisterType.SPARSE) {
if (isDense(register.getSize())) {
register = ((SparseRegister) register).toDense(p);
}
@@ -137,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
assert this.p == another.p;
assert this.hashFunc == another.hashFunc;
switch (register.getRegisterType()) {
- case SINGLE_VALUE:
- switch (another.getRegisterType()) {
case SINGLE_VALUE:
- if (register.getSize() > 0 && another.register.getSize() > 0) {
- register = ((SingleValueRegister) register).toSparse();
- } else {
- SingleValueRegister sr = (SingleValueRegister) another.register;
- if (sr.getSize() > 0)
- register.set(sr.getSingleValuePos(), sr.getValue());
- return;
+ switch (another.getRegisterType()) {
+ case SINGLE_VALUE:
+ if (register.getSize() > 0 && another.register.getSize() > 0) {
+ register = ((SingleValueRegister) register).toSparse();
+ } else {
+ SingleValueRegister sr = (SingleValueRegister) another.register;
+ if (sr.getSize() > 0)
+ register.set(sr.getSingleValuePos(), sr.getValue());
+ return;
+ }
+ break;
+ case SPARSE:
+ register = ((SingleValueRegister) register).toSparse();
+ break;
+ case DENSE:
+ register = ((SingleValueRegister) register).toDense(this.p);
+ break;
+ default:
+ break;
}
+
break;
case SPARSE:
- register = ((SingleValueRegister) register).toSparse();
- break;
- case DENSE:
- register = ((SingleValueRegister) register).toDense(this.p);
+ if (another.getRegisterType() == RegisterType.DENSE) {
+ register = ((SparseRegister) register).toDense(p);
+ }
break;
default:
break;
- }
-
- break;
- case SPARSE:
- if (another.getRegisterType() == RegisterType.DENSE) {
- register = ((SparseRegister) register).toDense(p);
- }
- break;
- default:
- break;
}
register.merge(another.register);
toDenseIfNeeded();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 9f65163..e6cea2b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -24,13 +24,13 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -62,7 +62,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private HashFunction hf = null;
private int rowCount = 0;
private int samplingPercentage;
- private ByteArray[] row_hashcodes = null;
+ //private ByteArray[] row_hashcodes = null;
+ private long[] rowHashCodesLong = null;
private ByteBuffer tmpbuf;
private static final Text EMPTY_TEXT = new Text();
public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
@@ -92,14 +93,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL = new HLLCounter[cuboidIds.length];
for (int i = 0; i < cuboidIds.length; i++) {
- allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
- hf = Hashing.murmur3_32();
- row_hashcodes = new ByteArray[nRowKey];
- for (int i = 0; i < nRowKey; i++) {
- row_hashcodes[i] = new ByteArray();
- }
+ hf = Hashing.murmur3_128();
+ rowHashCodesLong = new long[nRowKey];
TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (partitionColRef != null) {
@@ -211,26 +209,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
private void putRowKeyToHLL(String[] row) {
-
//generate hash for each row key column
for (int i = 0; i < nRowKey; i++) {
Hasher hc = hf.newHasher();
String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue != null) {
- row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
}
// user the row key column hash to get a consolidated hash for each cuboid
for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
+ long value = 0;
for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+ value += rowHashCodesLong[allCuboidsBitSet[i][position]];
}
-
- allCuboidsHLL[i].add(hc.hash().asBytes());
+ allCuboidsHLL[i].addHashDirectly(value);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
new file mode 100644
index 0000000..f018f28
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@Ignore
+public class NewCubeSamplingMethodTest {
+
+ private static final int ROW_LENGTH = 10;
+
+ private Integer[][] allCuboidsBitSet;
+
+ private long baseCuboidId;
+
+ private final int rowCount = 500000;
+
+ @Before
+ public void setup() {
+ baseCuboidId = (1L << ROW_LENGTH) - 1;
+ createAllCuboidBitSet();
+ System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+ }
+
+ @Ignore
+ @Test
+ public void testRandomData() throws Exception {
+ List<List<String>> dataSet = getRandomDataset(rowCount);
+ comparePerformanceBasic(dataSet);
+ compareAccuracyBasic(dataSet);
+ }
+
+
+ @Ignore
+ @Test
+ public void testSmallCardData() throws Exception {
+ List<List<String>> dataSet = getSmallCardDataset(rowCount);
+ comparePerformanceBasic(dataSet);
+ compareAccuracyBasic(dataSet);
+ }
+
+
+ public void comparePerformanceBasic(final List<List<String>> rows) throws Exception {
+ //old hash method
+ ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+ HLLCounter[] cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+ long start = System.currentTimeMillis();
+ for (List<String> row : rows) {
+ putRowKeyToHLL(row, colHashValues, cuboidCounters, Hashing.murmur3_32());
+ }
+ long totalTime = System.currentTimeMillis() - start;
+ System.out.println("old method cost time : " + totalTime);
+ //new hash method
+ colHashValues = getNewColHashValues(ROW_LENGTH);
+ cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+ start = System.currentTimeMillis();
+ long[] valueHashLong = new long[allCuboidsBitSet.length];
+ for (List<String> row : rows) {
+ putRowKeyToHLLNew(row, valueHashLong, cuboidCounters, Hashing.murmur3_128());
+ }
+ totalTime = System.currentTimeMillis() - start;
+ System.out.println("new method cost time : " + totalTime);
+ }
+
+ //test accuracy
+ public void compareAccuracyBasic(final List<List<String>> rows) throws Exception {
+ final long realCardinality = countCardinality(rows);
+ System.out.println("real cardinality : " + realCardinality);
+ //test1
+ long t1 = runAndGetTime(new TestCase() {
+ @Override
+ public void run() throws Exception {
+ HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+ final ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+ HashFunction hf = Hashing.murmur3_32();
+ for (List<String> row : rows) {
+
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf.newHasher();
+ colHashValues[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < colHashValues.length; position++) {
+ hc.putBytes(colHashValues[position].array());
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ long estimate = counter.getCountEstimate();
+ System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+ }
+ });
+
+
+ long t2 = runAndGetTime(new TestCase() {
+ @Override
+ public void run() throws Exception {
+ HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+ HashFunction hf2 = Hashing.murmur3_128();
+ long[] valueHashLong = new long[allCuboidsBitSet.length];
+ for (List<String> row : rows) {
+
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf2.newHasher();
+ byte[] bytes = hc.putString(x + field).hash().asBytes();
+ valueHashLong[x++] = Bytes.toLong(bytes);
+ }
+
+ long value = 0;
+ for (int position = 0; position < row.size(); position++) {
+ value += valueHashLong[position];
+ }
+ counter.addHashDirectly(value);
+ }
+ long estimate = counter.getCountEstimate();
+ System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+ }
+ });
+ }
+
+ public void createAllCuboidBitSet() {
+ List<Long> allCuboids = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ for (long i = 1; i < baseCuboidId; i++) {
+ allCuboids.add(i);
+ addCuboidBitSet(i, allCuboidsBitSetList);
+ }
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+ }
+
+ private ByteArray[] getNewColHashValues(int rowLength) {
+ ByteArray[] colHashValues = new ByteArray[rowLength];
+ for (int i = 0; i < rowLength; i++) {
+ colHashValues[i] = new ByteArray();
+ }
+ return colHashValues;
+ }
+
+ private HLLCounter[] getNewCuboidCounters(int cuboidNum) {
+ HLLCounter[] counters = new HLLCounter[cuboidNum];
+ for (int i = 0; i < counters.length; i++)
+ counters[i] = new HLLCounter(14, RegisterType.DENSE);
+ return counters;
+ }
+
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+ Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+
+ }
+
+ private long runAndGetTime(TestCase testCase) throws Exception {
+ long start = System.currentTimeMillis();
+ testCase.run();
+ long totalTime = System.currentTimeMillis() - start;
+ return totalTime;
+ }
+
+ interface TestCase {
+ void run() throws Exception;
+ }
+
+ private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hashFunction.newHasher();
+ colHashValues[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hashFunction.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(colHashValues[allCuboidsBitSet[i][position]].array());
+ //hc.putBytes(seperator);
+ }
+ cuboidCounters[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hashFunction.newHasher();
+ byte[] bytes = hc.putString(x + field).hash().asBytes();
+ hashValuesLong[x++] = Bytes.toLong(bytes);
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ value += hashValuesLong[allCuboidsBitSet[i][position]];
+ }
+ cuboidCounters[i].addHashDirectly(value);
+ }
+ }
+
+ private List<List<String>> getRandomDataset(int size) {
+ List<List<String>> rows = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ rows.add(getRandomRow());
+ }
+ return rows;
+ }
+
+ private List<List<String>> getSmallCardDataset(int size) {
+ List<List<String>> rows = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ rows.add(getSmallCardRow());
+ }
+ return rows;
+ }
+
+ private List<String> getRandomRow() {
+ List<String> row = new ArrayList<>();
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row.add(RandomStringUtils.random(10));
+ }
+ return row;
+ }
+
+ private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
+
+ private Random rand = new Random(System.currentTimeMillis());
+
+ private List<String> getSmallCardRow() {
+ List<String> row = new ArrayList<>();
+ row.add(smallCardRow[rand.nextInt(smallCardRow.length)]);
+ for (int i = 1; i < ROW_LENGTH; i++) {
+ row.add("abc");
+ }
+ return row;
+ }
+
+
+ private int countCardinality(List<List<String>> rows) {
+ Set<String> diffCols = new HashSet<String>();
+ for (List<String> row : rows) {
+ StringBuilder sb = new StringBuilder();
+ for (String str : row) {
+ sb.append(str);
+ }
+ diffCols.add(sb.toString());
+ }
+ return diffCols.size();
+ }
+
+ private double countErrorRate(long estimate, long real) {
+ double rate = Math.abs((estimate - real) * 1.0) / real;
+ return rate;
+ }
+}
[05/21] kylin git commit: minor,
process NoSuchMethodError with lower version Kafka consumer
Posted by li...@apache.org.
minor, process NoSuchMethodError with lower version Kafka consumer
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2ab9bf2d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2ab9bf2d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2ab9bf2d
Branch: refs/heads/KYLIN-2501
Commit: 2ab9bf2d88aae2e8fa6189e8e375e140e9dbb4b5
Parents: 8be842e
Author: Billy Liu <bi...@apache.org>
Authored: Wed Mar 15 21:03:47 2017 -0700
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Mar 15 21:03:47 2017 -0700
----------------------------------------------------------------------
.../kylin/source/kafka/config/KafkaConsumerProperties.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/2ab9bf2d/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 5b3dd87..cc32ed9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -79,8 +79,8 @@ public class KafkaConsumerProperties {
Set<String> configNames = new HashSet<String>();
try {
configNames = ConsumerConfig.configNames();
- } catch (Exception e) {
- // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException
+ } catch (Error e) {
+ // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception
String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
+ " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
configNames.addAll(Arrays.asList(configNamesArray));
[14/21] kylin git commit: minor change on kylin pom
Posted by li...@apache.org.
minor change on kylin pom
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c82603a7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c82603a7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c82603a7
Branch: refs/heads/KYLIN-2501
Commit: c82603a7f4a0f8b1128cfe057a12b922cd1282f8
Parents: 3ae6f9c
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 20 19:54:38 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 20 19:54:38 2017 +0800
----------------------------------------------------------------------
pom.xml | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c82603a7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a56dde..dec04bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,9 +62,8 @@
<!-- Spark versions -->
<spark.version>1.6.3</spark.version>
<kryo.version>4.0.0</kryo.version>
- <!--
- <reflections.version>0.9.10</reflections.version>
- -->
+
+ <!-- <reflections.version>0.9.10</reflections.version> -->
<!-- Calcite Version -->
<calcite.version>1.11.0</calcite.version>
[11/21] kylin git commit: KYLIN-2514 reorder joinTable first
Posted by li...@apache.org.
KYLIN-2514 reorder joinTable first
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/08600484
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/08600484
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/08600484
Branch: refs/heads/KYLIN-2501
Commit: 0860048484639aa07f745726a58c8544a911e5f6
Parents: 19c87e7
Author: Roger Shi <ro...@hotmail.com>
Authored: Sat Mar 18 14:58:25 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sat Mar 18 17:29:20 2017 +0800
----------------------------------------------------------------------
.../kylin/metadata/model/DataModelDesc.java | 43 +++++++++++++++++++-
.../apache/kylin/metadata/model/JoinsTree.java | 42 ++-----------------
2 files changed, 45 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/08600484/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 0a303ec..a26ccce 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -19,11 +19,13 @@
package org.apache.kylin.metadata.model;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
@@ -49,11 +51,11 @@ import com.google.common.collect.Sets;
public class DataModelDesc extends RootPersistentEntity {
private static final Logger logger = LoggerFactory.getLogger(DataModelDesc.class);
- public static enum TableKind implements Serializable{
+ public static enum TableKind implements Serializable {
FACT, LOOKUP
}
- public static enum RealizationCapacity implements Serializable{
+ public static enum RealizationCapacity implements Serializable {
SMALL, MEDIUM, LARGE
}
@@ -295,6 +297,7 @@ public class DataModelDesc extends RootPersistentEntity {
initJoinTablesForUpgrade();
initTableAlias(tables);
initJoinColumns();
+ reorderJoins(tables);
initJoinsTree();
initDimensionsAndMetrics();
initPartitionDesc();
@@ -452,6 +455,42 @@ public class DataModelDesc extends RootPersistentEntity {
joinsTree = new JoinsTree(rootFactTableRef, joins);
}
+ private void reorderJoins(Map<String, TableDesc> tables) {
+ if (joinTables.length == 0) {
+ return;
+ }
+
+ Map<String, List<JoinTableDesc>> fkMap = Maps.newHashMap();
+ for (JoinTableDesc joinTable : joinTables) {
+ JoinDesc join = joinTable.getJoin();
+ String fkSideName = join.getFKSide().getAlias();
+ if (fkMap.containsKey(fkSideName)) {
+ fkMap.get(fkSideName).add(joinTable);
+ } else {
+ List<JoinTableDesc> joinTableList = Lists.newArrayList();
+ joinTableList.add(joinTable);
+ fkMap.put(fkSideName, joinTableList);
+ }
+ }
+
+ JoinTableDesc[] orderedJoinTables = new JoinTableDesc[joinTables.length];
+ int orderedIndex = 0;
+
+ Queue<JoinTableDesc> joinTableBuff = new ArrayDeque<JoinTableDesc>();
+ TableDesc rootDesc = tables.get(rootFactTable);
+ joinTableBuff.addAll(fkMap.get(rootDesc.getName()));
+ while (!joinTableBuff.isEmpty()) {
+ JoinTableDesc head = joinTableBuff.poll();
+ orderedJoinTables[orderedIndex++] = head;
+ String headAlias = head.getJoin().getPKSide().getAlias();
+ if (fkMap.containsKey(headAlias)) {
+ joinTableBuff.addAll(fkMap.get(headAlias));
+ }
+ }
+
+ joinTables = orderedJoinTables;
+ }
+
private boolean validate() {
// ensure no dup between dimensions/metrics
http://git-wip-us.apache.org/repos/asf/kylin/blob/08600484/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index c6df52e..8e2192f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -19,17 +19,13 @@
package org.apache.kylin.metadata.model;
import java.io.Serializable;
-import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
public class JoinsTree implements Serializable {
private static final long serialVersionUID = 1L;
@@ -44,41 +40,11 @@ public class JoinsTree implements Serializable {
Preconditions.checkState(col.isQualified());
}
- // Walk through joins to build FK table to joins mapping
- HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
- int joinCount = 0;
+ tableChains.put(rootTable.getAlias(), new Chain(rootTable, null, null));
for (JoinDesc join : joins) {
- joinCount++;
- String fkSideAlias = join.getFKSide().getAlias();
- if (fkJoinMap.containsKey(fkSideAlias)) {
- fkJoinMap.get(fkSideAlias).add(join);
- } else {
- List<JoinDesc> joinDescList = Lists.newArrayList(join);
- fkJoinMap.put(fkSideAlias, joinDescList);
- }
- }
-
- // Width-first build tree (tableChains)
- Queue<Chain> chainBuff = new ArrayDeque<Chain>();
- chainBuff.add(new Chain(rootTable, null, null));
- int chainCount = 0;
- while (!chainBuff.isEmpty()) {
- Chain chain = chainBuff.poll();
- String pkSideAlias = chain.table.getAlias();
- chainCount++;
- tableChains.put(pkSideAlias, chain);
-
- // this round pk side is next round's fk side
- if (fkJoinMap.containsKey(pkSideAlias)) {
- for (JoinDesc join : fkJoinMap.get(pkSideAlias)) {
- chainBuff.add(new Chain(join.getPKSide(), join, chain));
- }
- }
- }
-
- // if join count not match (chain count - 1), there must be some join not take effect
- if (joinCount != (chainCount - 1)) {
- throw new IllegalArgumentException("There's some illegal Joins, please check your model");
+ TableRef pkSide = join.getPKSide();
+ Chain fkSide = tableChains.get(join.getFKSide().getAlias());
+ tableChains.put(pkSide.getAlias(), new Chain(pkSide, join, fkSide));
}
}
[09/21] kylin git commit: KYLIN-2514 make left and inner model align
Posted by li...@apache.org.
KYLIN-2514 make left and inner model align
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8b70fa52
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8b70fa52
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8b70fa52
Branch: refs/heads/KYLIN-2501
Commit: 8b70fa5210c7c2f95086dd2311d373277161d4ed
Parents: 14b96a8
Author: Roger Shi <ro...@hotmail.com>
Authored: Fri Mar 17 22:19:16 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 23:28:15 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/metadata/model/JoinsTree.java | 3 ++-
.../model_desc/ci_inner_join_model.json | 28 ++++++++++----------
2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8b70fa52/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index 3c876a0..224788c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -19,6 +19,7 @@
package org.apache.kylin.metadata.model;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -59,7 +60,7 @@ public class JoinsTree implements Serializable {
}
// Width-first build tree (tableChains)
- Queue<Chain> chainBuff = Queues.newArrayDeque();
+ Queue<Chain> chainBuff = new ArrayDeque<Chain>();
chainBuff.add(new Chain(rootTable, null, null));
int chainCount = 0;
while (!chainBuff.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/8b70fa52/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json b/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
index 19cf721..b79d293 100644
--- a/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
+++ b/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
@@ -31,20 +31,6 @@
}
},
{
- "table": "DEFAULT.TEST_ACCOUNT",
- "alias": "SELLER_ACCOUNT",
- "kind": "FACT",
- "join": {
- "type": "INNER",
- "primary_key": [
- "SELLER_ACCOUNT.ACCOUNT_ID"
- ],
- "foreign_key": [
- "TEST_KYLIN_FACT.SELLER_ID"
- ]
- }
- },
- {
"table": "EDW.TEST_CAL_DT",
"join": {
"type": "INNER",
@@ -119,6 +105,20 @@
"SELLER_ACCOUNT.ACCOUNT_COUNTRY"
]
}
+ },
+ {
+ "table": "DEFAULT.TEST_ACCOUNT",
+ "alias": "SELLER_ACCOUNT",
+ "kind": "FACT",
+ "join": {
+ "type": "INNER",
+ "primary_key": [
+ "SELLER_ACCOUNT.ACCOUNT_ID"
+ ],
+ "foreign_key": [
+ "TEST_KYLIN_FACT.SELLER_ID"
+ ]
+ }
}
],
"dimensions": [
[06/21] kylin git commit: minor, job diag support different hadoop env
Posted by li...@apache.org.
minor, job diag support different hadoop env
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c800d6e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c800d6e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c800d6e
Branch: refs/heads/KYLIN-2501
Commit: 6c800d6e825c36ebe677b19efd7f02a7e3b9509b
Parents: 2ab9bf2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 16:48:02 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 17:22:31 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/tool/MrJobInfoExtractor.java | 51 ++++++++++++++------
1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c800d6e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index b9bf2de..ca4c7e1 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class MrJobInfoExtractor extends AbstractInfoExtractor {
@@ -59,6 +61,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
private static final int HTTP_RETRY = 3;
+ private Map<String, String> nodeInfoMap = Maps.newHashMap();
+
+ private String jobHistoryUrlBase;
+ private String yarnMasterUrlBase;
+
public MrJobInfoExtractor() {
packageType = "MR";
@@ -71,14 +78,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
extractor.execute(args);
}
- private String getRestCheckUrl() {
+ private void extractRestCheckUrl() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
- Pattern pattern = Pattern.compile("(http://)(.*):.*");
+ Pattern pattern = Pattern.compile("(http://)([^:]*):([^/])*.*");
if (yarnStatusCheckUrl != null) {
Matcher m = pattern.matcher(yarnStatusCheckUrl);
if (m.matches()) {
- return m.group(1) + m.group(2) + ":19888";
+ jobHistoryUrlBase = m.group(1) + m.group(2) + ":19888";
+ yarnMasterUrlBase = m.group(1) + m.group(2) + ":" + m.group(3);
}
}
logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration");
@@ -91,14 +99,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf);
}
if (StringUtils.isEmpty(rmWebHost)) {
- return null;
+ return;
}
if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
rmWebHost = "http://" + rmWebHost;
}
Matcher m = pattern.matcher(rmWebHost);
Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
- return m.group(1) + m.group(2) + ":19888";
+ yarnMasterUrlBase = rmWebHost;
+ jobHistoryUrlBase = m.group(1) + HAUtil.getConfValueForRMInstance("mapreduce.jobhistory.webapp.address", m.group(2) + ":19888", conf);
}
private String getHttpResponse(String url) {
@@ -122,7 +131,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
return msg;
}
- private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
+ private void extractTaskDetail(String taskId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
try {
if (StringUtils.isEmpty(taskId)) {
return;
@@ -137,8 +146,9 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
String succAttemptId = taskAttempt.textValue();
String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId);
- JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId");
- String containerId = attemptAttempt.textValue();
+ JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt");
+ String containerId = attemptAttempt.get("assignedContainerId").textValue();
+ String nodeId = nodeInfoMap.get(attemptAttempt.get("nodeHttpAddress").textValue());
// save task counters
saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters");
@@ -173,16 +183,25 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
try {
boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true;
String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
- String jobUrlBase = getRestCheckUrl();
- String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+ extractRestCheckUrl();
+
+ Preconditions.checkNotNull(jobHistoryUrlBase);
+ Preconditions.checkNotNull(yarnMasterUrlBase);
+
+ String jobUrlPrefix = jobHistoryUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
+ // cache node info
+ String nodeUrl = yarnMasterUrlBase + "/ws/v1/cluster/nodes";
+ String nodeResponse = getHttpResponse(nodeUrl);
+ JsonNode nodes = new ObjectMapper().readTree(nodeResponse).path("nodes").path("node");
+ for (JsonNode node : nodes) {
+ nodeInfoMap.put(node.path("nodeHTTPAddress").textValue(), node.path("id").textValue());
+ }
// save mr job stats
String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix);
String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue();
- String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts");
- String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue();
-
// save mr job conf
saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf");
@@ -191,7 +210,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
// save task details
if (includeTaskDetails) {
- extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user);
+ extractTaskDetails(exportDir, jobUrlPrefix, jobHistoryUrlBase, user);
}
} catch (Exception e) {
@@ -199,7 +218,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
}
}
- private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) {
+ private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String user) {
try {
String tasksUrl = jobUrlPrefix + "/tasks/";
String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl);
@@ -324,7 +343,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
File tasksDir = new File(exportDir, "tasks");
FileUtils.forceMkdir(tasksDir);
for (String taskId : selectedTaskIds) {
- extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase);
+ extractTaskDetail(taskId, user, tasksDir, tasksUrl, jobUrlBase);
}
} catch (Exception e) {
logger.warn("Failed to get mr tasks rest response.", e);
[03/21] kylin git commit: KYLIN-2504 keep the engine_type property
when clone cube
Posted by li...@apache.org.
KYLIN-2504 keep the engine_type property when clone cube
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dde297e4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dde297e4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dde297e4
Branch: refs/heads/KYLIN-2501
Commit: dde297e48a6886f662cd153c6f834f7e26560522
Parents: 32034fc
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 22:25:51 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 22:25:51 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/rest/controller/CubeController.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/dde297e4/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index da44aec..8cdfcfb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -361,10 +360,7 @@ public class CubeController extends BasicController {
CubeDesc cubeDesc = cube.getDescriptor();
CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
- KylinConfig config = cubeService.getConfig();
newCubeDesc.setName(newCubeName);
- newCubeDesc.setEngineType(config.getDefaultCubeEngine());
- newCubeDesc.setStorageType(config.getDefaultStorageEngine());
CubeInstance newCube;
try {
[13/21] kylin git commit: KYLIN-2434 support config
kylin.source.hive.database-for-flat-table in Spark cubing
Posted by li...@apache.org.
KYLIN-2434 support config kylin.source.hive.database-for-flat-table in Spark cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ae6f9c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ae6f9c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ae6f9c9
Branch: refs/heads/KYLIN-2501
Commit: 3ae6f9c9120360cdc9fc238a2a9208fa9813aea6
Parents: 98664f0
Author: shaofengshi <sh...@apache.org>
Authored: Sat Mar 18 10:39:57 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 19 09:47:47 2017 +0800
----------------------------------------------------------------------
.../kylin/engine/spark/SparkBatchCubingJobBuilder2.java | 2 +-
.../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 9 +++++----
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e0b3e6c..66b154d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,7 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
- sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f7ed2d0..f70fd30 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -77,22 +77,23 @@ import java.util.List;
/**
+ * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
public class SparkCubingByLayer extends AbstractApplication implements Serializable {
protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class);
- public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
private Options options;
public SparkCubingByLayer() {
options = new Options();
- options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_INPUT_TABLE);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_CONF_PATH);
@@ -134,7 +135,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
- final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
@@ -154,7 +155,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
HiveContext sqlContext = new HiveContext(sc.sc());
- final DataFrame intermediateTable = sqlContext.table(envConfig.getHiveDatabaseForIntermediateTable() + "." + hiveTable);
+ final DataFrame intermediateTable = sqlContext.table(hiveTable);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();