You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 06:59:51 UTC

[01/21] kylin git commit: KYLIN-2508, Trans the time to UTC time when set the range of building cube [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2501 498aa2562 -> d045a045a (forced update)


KYLIN-2508,Trans the time to UTC time when set the range of building cube

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3e6992da
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3e6992da
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3e6992da

Branch: refs/heads/KYLIN-2501
Commit: 3e6992da3b844d4642985f732c3269e354fed88c
Parents: 5155994
Author: luguosheng <55...@qq.com>
Authored: Wed Mar 15 17:21:39 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 15 18:35:50 2017 +0800

----------------------------------------------------------------------
 webapp/app/js/directives/directives.js | 77 +++++++++++++++--------------
 1 file changed, 39 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3e6992da/webapp/app/js/directives/directives.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/directives/directives.js b/webapp/app/js/directives/directives.js
index ca08493..bca3b03 100644
--- a/webapp/app/js/directives/directives.js
+++ b/webapp/app/js/directives/directives.js
@@ -249,49 +249,50 @@ KylinApp.directive('kylinPagination', function ($parse, $q) {
       }
     };
   }).directive('dateTimepickerTimezone', function () {
-  // this directive workaround to convert GMT0 timestamp to GMT date for datepicker
-  return {
-    restrict: 'A',
-    priority: 1,
-    require: 'ngModel',
-    link: function (scope, element, attrs, ctrl) {
-      ctrl.$formatters.push(function (value) {
-
-        //set null for 0
-        if(value===0){
-          return '';
-        }
-
-        //return value;
-        var newDate = new Date(value + (60000 * new Date().getTimezoneOffset()));
-
-        var year = newDate.getFullYear();
-        var month = (newDate.getMonth()+1)<10?'0'+(newDate.getMonth()+1):(newDate.getMonth()+1);
-        var date = newDate.getDate()<10?'0'+newDate.getDate():newDate.getDate();
+    return {
+      restrict: 'A',
+      priority: 1,
+      require: 'ngModel',
+      link: function (scope, element, attrs, ctrl) {
+        ctrl.$formatters.push(function (value) {
 
-        var hour = newDate.getHours()<10?'0'+newDate.getHours():newDate.getHours();
-        var mins = newDate.getMinutes()<10?'0'+newDate.getMinutes():newDate.getMinutes();
-        var seconds = newDate.getSeconds()<10?'0'+newDate.getSeconds():newDate.getSeconds();
+          //set null for 0
+          if(value===0){
+            return '';
+          }
 
-        var viewVal = year+"-"+month+"-"+date+" "+hour+":"+mins+":"+seconds;
-        return viewVal;
-      });
+          //return value;
+          var newDate = new Date(value);
+          var year = newDate.getUTCFullYear();
+          var month = (newDate.getUTCMonth()+1)<10?'0'+(newDate.getUTCMonth()+1):(newDate.getUTCMonth()+1);
+          var date = newDate.getUTCDate()<10?'0'+newDate.getUTCDate():newDate.getUTCDate();
+          var hour = newDate.getUTCHours()<10?'0'+newDate.getUTCHours():newDate.getUTCHours();
+          var mins = newDate.getUTCMinutes()<10?'0'+newDate.getUTCMinutes():newDate.getUTCMinutes();
+          var seconds = newDate.getUTCSeconds()<10?'0'+newDate.getUTCSeconds():newDate.getUTCSeconds();
+          var viewVal = year+"-"+month+"-"+date+" "+hour+":"+mins+":"+seconds;
+          return viewVal;
+        });
 
-      ctrl.$parsers.push(function (value) {
-        var date;
-        if(/^\d{4}-\d{1,2}-\d{1,2}(\s+\d{1,2}:\d{1,2}:\d{1,2})?$/.test(value)) {
-          date=new Date(value);
-          if(!date||date&&!date.getTime()){
-            return value;
+        ctrl.$parsers.push(function (value) {
+          var date;
+          if(/^\d{4}-\d{1,2}-\d{1,2}(\s+\d{1,2}:\d{1,2}:\d{1,2})?$/.test(value)) {
+            date=new Date(value);
+            if(!date||date&&!date.getTime()){
+              return value;
+            }else{
+              var dateSplit=value.replace(/^\s+|\s+$/,'').replace(/\s+/,'-').split(/[:-]/);
+              var resultDate=[];
+              for(var i=0;i<6;i++){
+                resultDate[i]=dateSplit[i]||0;
+              }
+              return Date.UTC(resultDate[0],resultDate[1]-1,resultDate[2],resultDate[3],resultDate[4],resultDate[5]);
+            }
           }else{
-            return date.getTime()-(60000 * date.getTimezoneOffset());
+            return value;
           }
-        }else{
-          return value;
-        }
-      });
-    }
-  };
+        });
+      }
+    };
 }).directive("parametertree", function($compile) {
     return {
       restrict: "E",


[07/21] kylin git commit: KYLIN-2280 minor, reset to init before modify ports

Posted by li...@apache.org.
KYLIN-2280 minor,reset to init before modify ports

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/40249fee
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/40249fee
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/40249fee

Branch: refs/heads/KYLIN-2501
Commit: 40249fee78a8a078d461ea66009111082732d27b
Parents: 6c800d6
Author: xiefan46 <95...@qq.com>
Authored: Fri Mar 17 11:27:08 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 12:53:11 2017 +0800

----------------------------------------------------------------------
 build/bin/kylin_port_replace_util.sh | 15 ++++++++++-----
 build/script/download-tomcat.sh      |  1 +
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/40249fee/build/bin/kylin_port_replace_util.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin_port_replace_util.sh b/build/bin/kylin_port_replace_util.sh
index a51e60d..366d8d0 100755
--- a/build/bin/kylin_port_replace_util.sh
+++ b/build/bin/kylin_port_replace_util.sh
@@ -31,6 +31,7 @@ else
 fi
 
 #variables
+TOMCAT_INIT_FILE="${KYLIN_HOME}/tomcat/conf/server.xml.init"
 TOMCAT_BACKUP_FILE="${KYLIN_HOME}/tomcat/conf/server.xml.backup"
 TOMCAT_CONFIG_FILE="${KYLIN_HOME}/tomcat/conf/server.xml"
 KYLIN_CONFIG_FILE="${KYLIN_HOME}/conf/kylin.properties"
@@ -49,6 +50,15 @@ then
         exit 1
     fi
 
+
+    #backup tomccat file
+    if [ ! -f ${TOMCAT_BACKUP_FILE} ]; then
+        cp -f ${TOMCAT_CONFIG_FILE} ${TOMCAT_BACKUP_FILE}
+    fi
+
+    #force reset
+    cp -f ${TOMCAT_INIT_FILE} ${TOMCAT_CONFIG_FILE} #reset if exist
+
     #back or reset
     if [ ! -f ${KYLIN_BACKUP_FILE} ]; then  #backup if not exist
         cp -f ${KYLIN_CONFIG_FILE} ${KYLIN_BACKUP_FILE}
@@ -56,11 +66,6 @@ then
         cp -r ${KYLIN_BACKUP_FILE} ${KYLIN_CONFIG_FILE} #reset if exist
     fi
 
-    if [ ! -f ${TOMCAT_BACKUP_FILE} ]; then  #backup if not exist
-        cp -f ${TOMCAT_CONFIG_FILE} ${TOMCAT_BACKUP_FILE}
-    else
-        cp -r ${TOMCAT_BACKUP_FILE} ${TOMCAT_CONFIG_FILE} #reset if exist
-    fi
 
     #replace ports in kylin.properties
     new_kylin_port=`expr ${KYLIN_DEFAULT_PORT} + ${OFFSET}`

http://git-wip-us.apache.org/repos/asf/kylin/blob/40249fee/build/script/download-tomcat.sh
----------------------------------------------------------------------
diff --git a/build/script/download-tomcat.sh b/build/script/download-tomcat.sh
index 964400f..b0ff137 100755
--- a/build/script/download-tomcat.sh
+++ b/build/script/download-tomcat.sh
@@ -51,6 +51,7 @@ rm -rf build/tomcat/webapps/*
 mv build/tomcat/conf/server.xml build/tomcat/conf/server.xml.bak
 mv build/tomcat/conf/context.xml build/tomcat/conf/context.xml.bak
 cp build/deploy/server.xml build/tomcat/conf/server.xml
+cp build/deploy/server.xml build/tomcat/conf/server.xml.init
 echo "server.xml overwritten..."
 cp build/deploy/context.xml build/tomcat/conf/context.xml
 echo "context.xml overwritten..."


[02/21] kylin git commit: KYLIN-2344 remove spark-defaults.conf

Posted by li...@apache.org.
KYLIN-2344 remove spark-defaults.conf


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/32034fc1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/32034fc1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/32034fc1

Branch: refs/heads/KYLIN-2501
Commit: 32034fc1d0cb96156da8c4b49c35f2631a59079b
Parents: 3e6992d
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 20:17:01 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 20:17:19 2017 +0800

----------------------------------------------------------------------
 build/deploy/spark-defaults.conf | 5 -----
 build/script/download-spark.sh   | 2 --
 2 files changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/32034fc1/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
deleted file mode 100644
index 78a4bc9..0000000
--- a/build/deploy/spark-defaults.conf
+++ /dev/null
@@ -1,5 +0,0 @@
-spark.yarn.submit.file.replication=1
-spark.yarn.max.executor.failures=3
-spark.driver.extraJavaOptions=-Dhdp.version=current
-spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-spark.executor.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/32034fc1/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index 3ea7d08..b73331e 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -53,5 +53,3 @@ rm -rf build/spark/lib/spark-examples-*
 rm -rf build/spark/examples
 rm -rf build/spark/data
 rm -rf build/spark/R
-
-cp build/deploy/spark-defaults.conf build/spark/conf/spark-defaults.conf
\ No newline at end of file


[17/21] kylin git commit: #KYLIN-490 support multiple column distinct count

Posted by li...@apache.org.
#KYLIN-490 support multiple column distinct count

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/636282db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/636282db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/636282db

Branch: refs/heads/KYLIN-2501
Commit: 636282db889973fe29269b43e417414effb68b76
Parents: f72a3f6
Author: Roger Shi <ro...@hotmail.com>
Authored: Wed Mar 22 19:22:22 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:36:07 2017 +0800

----------------------------------------------------------------------
 .../BitmapIntersectDistinctCountAggFunc.java    |   9 +-
 .../measure/percentile/PercentileAggFunc.java   |   9 +-
 .../kylin/metadata/model/FunctionDesc.java      |  62 ++++++---
 .../kylin/metadata/model/ParameterDesc.java     | 135 +++++++++++++++++--
 .../kylin/query/relnode/OLAPAggregateRel.java   |  86 +++++++++---
 5 files changed, 250 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index cd4d306..a1e2665 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -17,6 +17,8 @@
 */
 package org.apache.kylin.measure.bitmap;
 
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +29,14 @@ import java.util.Map;
  * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
  *          requires an bitmap count distinct measure of uuid, and an dimension of event
  */
-public class BitmapIntersectDistinctCountAggFunc {
+public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount {
     private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
 
+    @Override
+    public int getParamAsMeasureCount() {
+        return -2;
+    }
+
     public static class RetentionPartialResult {
         Map<Object, BitmapCounter> map;
         List keyList;

http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
index ad02019..d3cec8f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.measure.percentile;
 
-public class PercentileAggFunc {
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
+public class PercentileAggFunc implements ParamAsMeasureCount{
     public static PercentileCounter init() {
         return null;
     }
@@ -41,4 +43,9 @@ public class PercentileAggFunc {
     public static double result(PercentileCounter counter) {
         return counter == null ? 0L : counter.getResultEstimate();
     }
+
+    @Override
+    public int getParamAsMeasureCount() {
+        return 1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index cbd7574..61c5fac 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,22 +18,26 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
 
 /**
  */
@@ -48,7 +52,7 @@ public class FunctionDesc implements Serializable {
         r.returnDataType = DataType.getType(returnType);
         return r;
     }
-    
+
     public static final String FUNC_SUM = "SUM";
     public static final String FUNC_MIN = "MIN";
     public static final String FUNC_MAX = "MAX";
@@ -95,7 +99,7 @@ public class FunctionDesc implements Serializable {
             }
         }
 
-        if(parameter != null)
+        if (parameter != null)
             parameter.setColRefs(colRefs);
     }
 
@@ -140,6 +144,8 @@ public class FunctionDesc implements Serializable {
             return getParameter().getValue();
         } else if (isCount()) {
             return "_KY_" + "COUNT__"; // ignores parameter, count(*), count(1), count(col) are all the same
+        } else if (isCountDistinct()) {
+            return "_KY_" + getFullExpressionInAlphabetOrder().replaceAll("[(),. ]", "_");
         } else {
             return "_KY_" + getFullExpression().replaceAll("[(),. ]", "_");
         }
@@ -197,6 +203,25 @@ public class FunctionDesc implements Serializable {
         return sb.toString();
     }
 
+    /**
+     * Parameters' name appears in alphabet order.
+     * This method is used for funcs whose parameters appear in arbitrary order
+     */
+    public String getFullExpressionInAlphabetOrder() {
+        StringBuilder sb = new StringBuilder(expression);
+        sb.append("(");
+        ParameterDesc localParam = parameter;
+        List<String> flatParams = Lists.newArrayList();
+        while (localParam != null) {
+            flatParams.add(localParam.getValue());
+            localParam = localParam.getNextParameter();
+        }
+        Collections.sort(flatParams);
+        sb.append(Joiner.on(",").join(flatParams));
+        sb.append(")");
+        return sb.toString();
+    }
+
     public boolean isDimensionAsMetric() {
         return isDimensionAsMetric;
     }
@@ -264,13 +289,20 @@ public class FunctionDesc implements Serializable {
                 return false;
         } else if (!expression.equals(other.expression))
             return false;
-        // NOTE: don't check the parameter of count()
-        if (isCount() == false) {
+        if (isCountDistinct()) {
+            // for count distinct func, param's order doesn't matter
+            if (parameter == null) {
+                if (other.parameter != null)
+                    return false;
+            } else {
+                return parameter.equalInArbitraryOrder(other.parameter);
+            }
+        } else if (!isCount()) { // NOTE: don't check the parameter of count()
             if (parameter == null) {
                 if (other.parameter != null)
                     return false;
             } else {
-                 if (!parameter.equals(other.parameter))
+                if (!parameter.equals(other.parameter))
                     return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 8ad20a8..5ba2f14 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,17 +18,19 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Sets;
 
 /**
  */
@@ -38,9 +40,9 @@ public class ParameterDesc implements Serializable {
     public static ParameterDesc newInstance(Object... objs) {
         if (objs.length == 0)
             throw new IllegalArgumentException();
-        
+
         ParameterDesc r = new ParameterDesc();
-        
+
         Object obj = objs[0];
         if (obj instanceof TblColRef) {
             TblColRef col = (TblColRef) obj;
@@ -51,7 +53,7 @@ public class ParameterDesc implements Serializable {
             r.type = FunctionDesc.PARAMETER_TYPE_CONSTANT;
             r.value = (String) obj;
         }
-        
+
         if (objs.length >= 2) {
             r.nextParameter = newInstance(Arrays.copyOfRange(objs, 1, objs.length));
             if (r.nextParameter.colRefs.size() > 0) {
@@ -63,7 +65,7 @@ public class ParameterDesc implements Serializable {
         }
         return r;
     }
-    
+
     @JsonProperty("type")
     private String type;
     @JsonProperty("value")
@@ -74,6 +76,15 @@ public class ParameterDesc implements Serializable {
     private ParameterDesc nextParameter;
 
     private List<TblColRef> colRefs = ImmutableList.of();
+    private Set<PlainParameter> plainParameters = null;
+
+    // Lazy evaluation
+    public Set<PlainParameter> getPlainParameters() {
+        if (plainParameters == null) {
+            plainParameters = PlainParameter.createFromParameterDesc(this);
+        }
+        return plainParameters;
+    }
 
     public String getType() {
         return type;
@@ -86,7 +97,7 @@ public class ParameterDesc implements Serializable {
     public String getValue() {
         return value;
     }
-    
+
     void setValue(String value) {
         this.value = value;
     }
@@ -94,7 +105,7 @@ public class ParameterDesc implements Serializable {
     public List<TblColRef> getColRefs() {
         return colRefs;
     }
-    
+
     void setColRefs(List<TblColRef> colRefs) {
         this.colRefs = colRefs;
     }
@@ -118,7 +129,7 @@ public class ParameterDesc implements Serializable {
 
         if (type != null ? !type.equals(that.type) : that.type != null)
             return false;
-        
+
         ParameterDesc p = this, q = that;
         int refi = 0, refj = 0;
         for (; p != null && q != null; p = p.nextParameter, q = q.nextParameter) {
@@ -138,10 +149,24 @@ public class ParameterDesc implements Serializable {
                     return false;
             }
         }
-        
+
         return p == null && q == null;
     }
 
+    public boolean equalInArbitraryOrder(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ParameterDesc that = (ParameterDesc) o;
+
+        Set<PlainParameter> thisPlainParams = this.getPlainParameters();
+        Set<PlainParameter> thatPlainParams = that.getPlainParameters();
+
+        return thisPlainParams.containsAll(thatPlainParams) && thatPlainParams.containsAll(thisPlainParams);
+    }
+
     @Override
     public int hashCode() {
         int result = type != null ? type.hashCode() : 0;
@@ -154,4 +179,88 @@ public class ParameterDesc implements Serializable {
         return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
     }
 
+    /**
+     * PlainParameter is created to present ParameterDesc in List style.
+     * Compared to ParameterDesc its advantage is:
+     * 1. easy to compare without considering order
+     * 2. easy to compare one by one
+     */
+    private static class PlainParameter {
+        private String type;
+        private String value;
+        private TblColRef colRef = null;
+
+        private PlainParameter() {
+        }
+
+        public boolean isColumnType() {
+            return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
+        }
+
+        static Set<PlainParameter> createFromParameterDesc(ParameterDesc parameterDesc) {
+            Set<PlainParameter> result = Sets.newHashSet();
+            ParameterDesc local = parameterDesc;
+            List<TblColRef> totalColRef = parameterDesc.colRefs;
+            Integer colIndex = 0;
+            while (local != null) {
+                if (local.isColumnType()) {
+                    result.add(createSingleColumnParameter(local, totalColRef.get(colIndex++)));
+                } else {
+                    result.add(createSingleValueParameter(local));
+                }
+                local = local.nextParameter;
+            }
+            return result;
+        }
+
+        static PlainParameter createSingleValueParameter(ParameterDesc parameterDesc) {
+            PlainParameter single = new PlainParameter();
+            single.type = parameterDesc.type;
+            single.value = parameterDesc.value;
+            return single;
+        }
+
+        static PlainParameter createSingleColumnParameter(ParameterDesc parameterDesc, TblColRef colRef) {
+            PlainParameter single = new PlainParameter();
+            single.type = parameterDesc.type;
+            single.value = parameterDesc.value;
+            single.colRef = colRef;
+            return single;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = type != null ? type.hashCode() : 0;
+            result = 31 * result + (colRef != null ? colRef.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PlainParameter that = (PlainParameter) o;
+
+            if (type != null ? !type.equals(that.type) : that.type != null)
+                return false;
+
+            if (this.isColumnType()) {
+                if (!that.isColumnType())
+                    return false;
+                if (!this.colRef.equals(that.colRef)) {
+                    return false;
+                }
+            } else {
+                if (that.isColumnType())
+                    return false;
+                if (!this.value.equals(that.value))
+                    return false;
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/636282db/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8d7c597..2c75a14 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.query.relnode;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.ParamAsMeasureCount;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
@@ -71,6 +73,7 @@ import com.google.common.collect.Sets;
 public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     private final static Map<String, String> AGGR_FUNC_MAP = new HashMap<String, String>();
+    private final static Map<String, Integer> AGGR_FUNC_PARAM_AS_MEASTURE_MAP = new HashMap<String, Integer>();
 
     static {
         AGGR_FUNC_MAP.put("SUM", "SUM");
@@ -84,6 +87,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         for (String udaf : udafFactories.keySet()) {
             AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
         }
+
+        Map<String, Class<?>> udafs = MeasureTypeFactory.getUDAFs();
+        for (String func : udafs.keySet()) {
+            try {
+                AGGR_FUNC_PARAM_AS_MEASTURE_MAP.put(func, ((ParamAsMeasureCount) (udafs.get(func).newInstance())).getParamAsMeasureCount());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     private static String getSqlFuncName(AggregateCall aggCall) {
@@ -235,12 +247,27 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         this.aggregations = new ArrayList<FunctionDesc>();
         for (AggregateCall aggCall : this.rewriteAggCalls) {
             ParameterDesc parameter = null;
+            // By default all args are included, UDFs can define their own in getParamAsMeasureCount method.
             if (!aggCall.getArgList().isEmpty()) {
-                // TODO: Currently only get the column of first param
-                int index = aggCall.getArgList().get(0);
-                TblColRef column = inputColumnRowType.getColumnByIndex(index);
-                if (!column.isInnerColumn()) {
-                    parameter = ParameterDesc.newInstance(column);
+                List<TblColRef> columns = Lists.newArrayList();
+                String funcName = getSqlFuncName(aggCall);
+                int columnsCount = aggCall.getArgList().size();
+                if (AGGR_FUNC_PARAM_AS_MEASTURE_MAP.containsKey(funcName)) {
+                    int asMeasureCnt = AGGR_FUNC_PARAM_AS_MEASTURE_MAP.get(funcName);
+                    if (asMeasureCnt > 0) {
+                        columnsCount = asMeasureCnt;
+                    } else {
+                        columnsCount += asMeasureCnt;
+                    }
+                }
+                for (Integer index : aggCall.getArgList().subList(0, columnsCount)) {
+                    TblColRef column = inputColumnRowType.getColumnByIndex(index);
+                    if (!column.isInnerColumn()) {
+                        columns.add(column);
+                    }
+                }
+                if (!columns.isEmpty()) {
+                    parameter = ParameterDesc.newInstance(columns.toArray(new TblColRef[columns.size()]));
                 }
             }
             String expression = getAggrFuncName(aggCall);
@@ -341,10 +368,11 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
             AggregateCall aggCall = this.rewriteAggCalls.get(i);
             if (!aggCall.getArgList().isEmpty()) {
-                int index = aggCall.getArgList().get(0);
-                TblColRef column = inputColumnRowType.getColumnByIndex(index);
-                if (!column.isInnerColumn()) {
-                    this.context.metricsColumns.add(column);
+                for (Integer index : aggCall.getArgList()) {
+                    TblColRef column = inputColumnRowType.getColumnByIndex(index);
+                    if (!column.isInnerColumn()) {
+                        this.context.metricsColumns.add(column);
+                    }
                 }
             }
         }
@@ -385,18 +413,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             return aggCall;
         }
 
-        // rebuild parameters
-        List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
-        if (func.needRewriteField()) {
-            RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
-            if (newArgList.isEmpty()) {
-                newArgList.add(field.getIndex());
-            } else {
-                // only the first column got overwritten
-                newArgList.set(0, field.getIndex());
-            }
-        }
-
         // rebuild function
         String callName = getSqlFuncName(aggCall);
         RelDataType fieldType = aggCall.getType();
@@ -408,12 +424,40 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
         }
 
+        // rebuild parameters
+        List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
+        if (udafMap != null && udafMap.containsKey(callName)) {
+            newArgList = truncArgList(newArgList, udafMap.get(callName));
+        }
+        if (func.needRewriteField()) {
+            RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
+            if (newArgList.isEmpty()) {
+                newArgList.add(field.getIndex());
+            } else {
+                // TODO: only the first column got overwritten
+                newArgList.set(0, field.getIndex());
+            }
+        }
+
         // rebuild aggregate call
         @SuppressWarnings("deprecation")
         AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
         return newAggCall;
     }
 
+    /**
+     * truncate Arg List according to UDAF's "add" method parameter count
+     */
+    private List<Integer> truncArgList(List<Integer> argList, Class<?> udafClazz) {
+        int argListLength = argList.size();
+        for (Method method : udafClazz.getMethods()) {
+            if (method.getName().equals("add")) {
+                argListLength = Math.min(method.getParameterTypes().length - 1, argListLength);
+            }
+        }
+        return argList.subList(0, argListLength);
+    }
+
     private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
         SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));


[18/21] kylin git commit: Merge commit '88a1c71dde855c693b230f67b92c4cd067d43b2b'

Posted by li...@apache.org.
Merge commit '88a1c71dde855c693b230f67b92c4cd067d43b2b'


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1cc0dd6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1cc0dd6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1cc0dd6

Branch: refs/heads/KYLIN-2501
Commit: b1cc0dd643ebd55776a3f22b61ce33e2abf29d48
Parents: 636282d 88a1c71
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 27 15:39:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:39:03 2017 +0800

----------------------------------------------------------------------
 .../kylin/measure/ParamAsMeasureCount.java      | 30 ++++++++++++++++++++
 .../resources/query/sql_distinct/query08.sql    | 24 ++++++++++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------



[10/21] kylin git commit: minor fix unused import

Posted by li...@apache.org.
minor fix unused import


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/19c87e7a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/19c87e7a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/19c87e7a

Branch: refs/heads/KYLIN-2501
Commit: 19c87e7a80b95a16a930feb6d0281555cd1232b5
Parents: 8b70fa5
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Mar 17 23:33:42 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 23:39:22 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/metadata/model/JoinsTree.java     | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/19c87e7a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index 224788c..c6df52e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -30,11 +30,10 @@ import java.util.Queue;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 
-public class JoinsTree  implements Serializable {
+public class JoinsTree implements Serializable {
     private static final long serialVersionUID = 1L;
-    
+
     final Map<String, Chain> tableChains = new LinkedHashMap<>();
 
     public JoinsTree(TableRef rootTable, List<JoinDesc> joins) {
@@ -48,7 +47,7 @@ public class JoinsTree  implements Serializable {
         // Walk through joins to build FK table to joins mapping
         HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
         int joinCount = 0;
-        for (JoinDesc join: joins) {
+        for (JoinDesc join : joins) {
             joinCount++;
             String fkSideAlias = join.getFKSide().getAlias();
             if (fkJoinMap.containsKey(fkSideAlias)) {
@@ -64,14 +63,14 @@ public class JoinsTree  implements Serializable {
         chainBuff.add(new Chain(rootTable, null, null));
         int chainCount = 0;
         while (!chainBuff.isEmpty()) {
-            Chain chain= chainBuff.poll();
+            Chain chain = chainBuff.poll();
             String pkSideAlias = chain.table.getAlias();
             chainCount++;
             tableChains.put(pkSideAlias, chain);
 
             // this round pk side is next round's fk side
             if (fkJoinMap.containsKey(pkSideAlias)) {
-                for (JoinDesc join: fkJoinMap.get(pkSideAlias)) {
+                for (JoinDesc join : fkJoinMap.get(pkSideAlias)) {
                     chainBuff.add(new Chain(join.getPKSide(), join, chain));
                 }
             }
@@ -149,7 +148,7 @@ public class JoinsTree  implements Serializable {
 
     public static class Chain implements Serializable {
         private static final long serialVersionUID = 1L;
-        
+
         TableRef table; // pk side
         JoinDesc join;
         Chain fkSide;


[16/21] kylin git commit: #KYLIN-490 support multiple column distinct count

Posted by li...@apache.org.
#KYLIN-490 support multiple column distinct count

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/88a1c71d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/88a1c71d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/88a1c71d

Branch: refs/heads/KYLIN-2501
Commit: 88a1c71dde855c693b230f67b92c4cd067d43b2b
Parents: f72a3f6
Author: Roger Shi <ro...@hotmail.com>
Authored: Wed Mar 22 19:22:22 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 27 15:20:40 2017 +0800

----------------------------------------------------------------------
 .../kylin/measure/ParamAsMeasureCount.java      |  30 +++++
 .../BitmapIntersectDistinctCountAggFunc.java    |   9 +-
 .../measure/percentile/PercentileAggFunc.java   |   9 +-
 .../kylin/metadata/model/FunctionDesc.java      |  62 ++++++---
 .../kylin/metadata/model/ParameterDesc.java     | 135 +++++++++++++++++--
 .../resources/query/sql_distinct/query08.sql    |  24 ++++
 .../kylin/query/relnode/OLAPAggregateRel.java   |  86 +++++++++---
 7 files changed, 304 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java b/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
new file mode 100644
index 0000000..b9bcd10
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/ParamAsMeasureCount.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+public interface ParamAsMeasureCount {
+    /**
+     * Get how many parameters are required to identify the measure
+     * Negative value is for var arguments function
+     * @return 0 ==> all parameters
+     *         positive number ==> parameter count
+     *         negative number ==> parameter count - required number
+     */
+    int getParamAsMeasureCount();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index cd4d306..a1e2665 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -17,6 +17,8 @@
 */
 package org.apache.kylin.measure.bitmap;
 
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +29,14 @@ import java.util.Map;
  * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
  *          requires an bitmap count distinct measure of uuid, and an dimension of event
  */
-public class BitmapIntersectDistinctCountAggFunc {
+public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount {
     private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
 
+    @Override
+    public int getParamAsMeasureCount() {
+        return -2;
+    }
+
     public static class RetentionPartialResult {
         Map<Object, BitmapCounter> map;
         List keyList;

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
index ad02019..d3cec8f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggFunc.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.measure.percentile;
 
-public class PercentileAggFunc {
+import org.apache.kylin.measure.ParamAsMeasureCount;
+
+public class PercentileAggFunc implements ParamAsMeasureCount{
     public static PercentileCounter init() {
         return null;
     }
@@ -41,4 +43,9 @@ public class PercentileAggFunc {
     public static double result(PercentileCounter counter) {
         return counter == null ? 0L : counter.getResultEstimate();
     }
+
+    @Override
+    public int getParamAsMeasureCount() {
+        return 1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index cbd7574..61c5fac 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,22 +18,26 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
 
 /**
  */
@@ -48,7 +52,7 @@ public class FunctionDesc implements Serializable {
         r.returnDataType = DataType.getType(returnType);
         return r;
     }
-    
+
     public static final String FUNC_SUM = "SUM";
     public static final String FUNC_MIN = "MIN";
     public static final String FUNC_MAX = "MAX";
@@ -95,7 +99,7 @@ public class FunctionDesc implements Serializable {
             }
         }
 
-        if(parameter != null)
+        if (parameter != null)
             parameter.setColRefs(colRefs);
     }
 
@@ -140,6 +144,8 @@ public class FunctionDesc implements Serializable {
             return getParameter().getValue();
         } else if (isCount()) {
             return "_KY_" + "COUNT__"; // ignores parameter, count(*), count(1), count(col) are all the same
+        } else if (isCountDistinct()) {
+            return "_KY_" + getFullExpressionInAlphabetOrder().replaceAll("[(),. ]", "_");
         } else {
             return "_KY_" + getFullExpression().replaceAll("[(),. ]", "_");
         }
@@ -197,6 +203,25 @@ public class FunctionDesc implements Serializable {
         return sb.toString();
     }
 
+    /**
+     * Parameters' name appears in alphabet order.
+     * This method is used for funcs whose parameters appear in arbitrary order
+     */
+    public String getFullExpressionInAlphabetOrder() {
+        StringBuilder sb = new StringBuilder(expression);
+        sb.append("(");
+        ParameterDesc localParam = parameter;
+        List<String> flatParams = Lists.newArrayList();
+        while (localParam != null) {
+            flatParams.add(localParam.getValue());
+            localParam = localParam.getNextParameter();
+        }
+        Collections.sort(flatParams);
+        sb.append(Joiner.on(",").join(flatParams));
+        sb.append(")");
+        return sb.toString();
+    }
+
     public boolean isDimensionAsMetric() {
         return isDimensionAsMetric;
     }
@@ -264,13 +289,20 @@ public class FunctionDesc implements Serializable {
                 return false;
         } else if (!expression.equals(other.expression))
             return false;
-        // NOTE: don't check the parameter of count()
-        if (isCount() == false) {
+        if (isCountDistinct()) {
+            // for count distinct func, param's order doesn't matter
+            if (parameter == null) {
+                if (other.parameter != null)
+                    return false;
+            } else {
+                return parameter.equalInArbitraryOrder(other.parameter);
+            }
+        } else if (!isCount()) { // NOTE: don't check the parameter of count()
             if (parameter == null) {
                 if (other.parameter != null)
                     return false;
             } else {
-                 if (!parameter.equals(other.parameter))
+                if (!parameter.equals(other.parameter))
                     return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 8ad20a8..5ba2f14 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,17 +18,19 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Sets;
 
 /**
  */
@@ -38,9 +40,9 @@ public class ParameterDesc implements Serializable {
     public static ParameterDesc newInstance(Object... objs) {
         if (objs.length == 0)
             throw new IllegalArgumentException();
-        
+
         ParameterDesc r = new ParameterDesc();
-        
+
         Object obj = objs[0];
         if (obj instanceof TblColRef) {
             TblColRef col = (TblColRef) obj;
@@ -51,7 +53,7 @@ public class ParameterDesc implements Serializable {
             r.type = FunctionDesc.PARAMETER_TYPE_CONSTANT;
             r.value = (String) obj;
         }
-        
+
         if (objs.length >= 2) {
             r.nextParameter = newInstance(Arrays.copyOfRange(objs, 1, objs.length));
             if (r.nextParameter.colRefs.size() > 0) {
@@ -63,7 +65,7 @@ public class ParameterDesc implements Serializable {
         }
         return r;
     }
-    
+
     @JsonProperty("type")
     private String type;
     @JsonProperty("value")
@@ -74,6 +76,15 @@ public class ParameterDesc implements Serializable {
     private ParameterDesc nextParameter;
 
     private List<TblColRef> colRefs = ImmutableList.of();
+    private Set<PlainParameter> plainParameters = null;
+
+    // Lazy evaluation
+    public Set<PlainParameter> getPlainParameters() {
+        if (plainParameters == null) {
+            plainParameters = PlainParameter.createFromParameterDesc(this);
+        }
+        return plainParameters;
+    }
 
     public String getType() {
         return type;
@@ -86,7 +97,7 @@ public class ParameterDesc implements Serializable {
     public String getValue() {
         return value;
     }
-    
+
     void setValue(String value) {
         this.value = value;
     }
@@ -94,7 +105,7 @@ public class ParameterDesc implements Serializable {
     public List<TblColRef> getColRefs() {
         return colRefs;
     }
-    
+
     void setColRefs(List<TblColRef> colRefs) {
         this.colRefs = colRefs;
     }
@@ -118,7 +129,7 @@ public class ParameterDesc implements Serializable {
 
         if (type != null ? !type.equals(that.type) : that.type != null)
             return false;
-        
+
         ParameterDesc p = this, q = that;
         int refi = 0, refj = 0;
         for (; p != null && q != null; p = p.nextParameter, q = q.nextParameter) {
@@ -138,10 +149,24 @@ public class ParameterDesc implements Serializable {
                     return false;
             }
         }
-        
+
         return p == null && q == null;
     }
 
+    public boolean equalInArbitraryOrder(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ParameterDesc that = (ParameterDesc) o;
+
+        Set<PlainParameter> thisPlainParams = this.getPlainParameters();
+        Set<PlainParameter> thatPlainParams = that.getPlainParameters();
+
+        return thisPlainParams.containsAll(thatPlainParams) && thatPlainParams.containsAll(thisPlainParams);
+    }
+
     @Override
     public int hashCode() {
         int result = type != null ? type.hashCode() : 0;
@@ -154,4 +179,88 @@ public class ParameterDesc implements Serializable {
         return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
     }
 
+    /**
+     * PlainParameter is created to present ParameterDesc in List style.
+     * Compared to ParameterDesc its advantage is:
+     * 1. easy to compare without considering order
+     * 2. easy to compare one by one
+     */
+    private static class PlainParameter {
+        private String type;
+        private String value;
+        private TblColRef colRef = null;
+
+        private PlainParameter() {
+        }
+
+        public boolean isColumnType() {
+            return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
+        }
+
+        static Set<PlainParameter> createFromParameterDesc(ParameterDesc parameterDesc) {
+            Set<PlainParameter> result = Sets.newHashSet();
+            ParameterDesc local = parameterDesc;
+            List<TblColRef> totalColRef = parameterDesc.colRefs;
+            Integer colIndex = 0;
+            while (local != null) {
+                if (local.isColumnType()) {
+                    result.add(createSingleColumnParameter(local, totalColRef.get(colIndex++)));
+                } else {
+                    result.add(createSingleValueParameter(local));
+                }
+                local = local.nextParameter;
+            }
+            return result;
+        }
+
+        static PlainParameter createSingleValueParameter(ParameterDesc parameterDesc) {
+            PlainParameter single = new PlainParameter();
+            single.type = parameterDesc.type;
+            single.value = parameterDesc.value;
+            return single;
+        }
+
+        static PlainParameter createSingleColumnParameter(ParameterDesc parameterDesc, TblColRef colRef) {
+            PlainParameter single = new PlainParameter();
+            single.type = parameterDesc.type;
+            single.value = parameterDesc.value;
+            single.colRef = colRef;
+            return single;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = type != null ? type.hashCode() : 0;
+            result = 31 * result + (colRef != null ? colRef.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PlainParameter that = (PlainParameter) o;
+
+            if (type != null ? !type.equals(that.type) : that.type != null)
+                return false;
+
+            if (this.isColumnType()) {
+                if (!that.isColumnType())
+                    return false;
+                if (!this.colRef.equals(that.colRef)) {
+                    return false;
+                }
+            } else {
+                if (that.isColumnType())
+                    return false;
+                if (!this.value.equals(that.value))
+                    return false;
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/kylin-it/src/test/resources/query/sql_distinct/query08.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_distinct/query08.sql b/kylin-it/src/test/resources/query/sql_distinct/query08.sql
new file mode 100644
index 0000000..60f02e7
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_distinct/query08.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select cal_dt,
+ sum(price) as GMV, 
+ count(1) as TRANS_CNT, 
+ count(distinct seller_id, lstg_format_name) as DIST_SELLER_FORMAT
+ from test_kylin_fact 
+ group by cal_dt

http://git-wip-us.apache.org/repos/asf/kylin/blob/88a1c71d/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8d7c597..2c75a14 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.query.relnode;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -55,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.ParamAsMeasureCount;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
@@ -71,6 +73,7 @@ import com.google.common.collect.Sets;
 public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     private final static Map<String, String> AGGR_FUNC_MAP = new HashMap<String, String>();
+    private final static Map<String, Integer> AGGR_FUNC_PARAM_AS_MEASTURE_MAP = new HashMap<String, Integer>();
 
     static {
         AGGR_FUNC_MAP.put("SUM", "SUM");
@@ -84,6 +87,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         for (String udaf : udafFactories.keySet()) {
             AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
         }
+
+        Map<String, Class<?>> udafs = MeasureTypeFactory.getUDAFs();
+        for (String func : udafs.keySet()) {
+            try {
+                AGGR_FUNC_PARAM_AS_MEASTURE_MAP.put(func, ((ParamAsMeasureCount) (udafs.get(func).newInstance())).getParamAsMeasureCount());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     private static String getSqlFuncName(AggregateCall aggCall) {
@@ -235,12 +247,27 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         this.aggregations = new ArrayList<FunctionDesc>();
         for (AggregateCall aggCall : this.rewriteAggCalls) {
             ParameterDesc parameter = null;
+            // By default all args are included, UDFs can define their own in getParamAsMeasureCount method.
             if (!aggCall.getArgList().isEmpty()) {
-                // TODO: Currently only get the column of first param
-                int index = aggCall.getArgList().get(0);
-                TblColRef column = inputColumnRowType.getColumnByIndex(index);
-                if (!column.isInnerColumn()) {
-                    parameter = ParameterDesc.newInstance(column);
+                List<TblColRef> columns = Lists.newArrayList();
+                String funcName = getSqlFuncName(aggCall);
+                int columnsCount = aggCall.getArgList().size();
+                if (AGGR_FUNC_PARAM_AS_MEASTURE_MAP.containsKey(funcName)) {
+                    int asMeasureCnt = AGGR_FUNC_PARAM_AS_MEASTURE_MAP.get(funcName);
+                    if (asMeasureCnt > 0) {
+                        columnsCount = asMeasureCnt;
+                    } else {
+                        columnsCount += asMeasureCnt;
+                    }
+                }
+                for (Integer index : aggCall.getArgList().subList(0, columnsCount)) {
+                    TblColRef column = inputColumnRowType.getColumnByIndex(index);
+                    if (!column.isInnerColumn()) {
+                        columns.add(column);
+                    }
+                }
+                if (!columns.isEmpty()) {
+                    parameter = ParameterDesc.newInstance(columns.toArray(new TblColRef[columns.size()]));
                 }
             }
             String expression = getAggrFuncName(aggCall);
@@ -341,10 +368,11 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
             AggregateCall aggCall = this.rewriteAggCalls.get(i);
             if (!aggCall.getArgList().isEmpty()) {
-                int index = aggCall.getArgList().get(0);
-                TblColRef column = inputColumnRowType.getColumnByIndex(index);
-                if (!column.isInnerColumn()) {
-                    this.context.metricsColumns.add(column);
+                for (Integer index : aggCall.getArgList()) {
+                    TblColRef column = inputColumnRowType.getColumnByIndex(index);
+                    if (!column.isInnerColumn()) {
+                        this.context.metricsColumns.add(column);
+                    }
                 }
             }
         }
@@ -385,18 +413,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             return aggCall;
         }
 
-        // rebuild parameters
-        List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
-        if (func.needRewriteField()) {
-            RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
-            if (newArgList.isEmpty()) {
-                newArgList.add(field.getIndex());
-            } else {
-                // only the first column got overwritten
-                newArgList.set(0, field.getIndex());
-            }
-        }
-
         // rebuild function
         String callName = getSqlFuncName(aggCall);
         RelDataType fieldType = aggCall.getType();
@@ -408,12 +424,40 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
         }
 
+        // rebuild parameters
+        List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
+        if (udafMap != null && udafMap.containsKey(callName)) {
+            newArgList = truncArgList(newArgList, udafMap.get(callName));
+        }
+        if (func.needRewriteField()) {
+            RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
+            if (newArgList.isEmpty()) {
+                newArgList.add(field.getIndex());
+            } else {
+                // TODO: only the first column got overwritten
+                newArgList.set(0, field.getIndex());
+            }
+        }
+
         // rebuild aggregate call
         @SuppressWarnings("deprecation")
         AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
         return newAggCall;
     }
 
+    /**
+     * truncate Arg List according to UDAF's "add" method parameter count
+     */
+    private List<Integer> truncArgList(List<Integer> argList, Class<?> udafClazz) {
+        int argListLength = argList.size();
+        for (Method method : udafClazz.getMethods()) {
+            if (method.getName().equals("add")) {
+                argListLength = Math.min(method.getParameterTypes().length - 1, argListLength);
+            }
+        }
+        return argList.subList(0, argListLength);
+    }
+
     private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
         SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));


[15/21] kylin git commit: minor change on kylin pom

Posted by li...@apache.org.
minor change on kylin pom


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f72a3f6d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f72a3f6d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f72a3f6d

Branch: refs/heads/KYLIN-2501
Commit: f72a3f6d35c028ebb5c68ee6bd11b97750fbf194
Parents: c82603a
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 20 20:08:43 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 20 20:08:43 2017 +0800

----------------------------------------------------------------------
 pom.xml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f72a3f6d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dec04bc..0e41dcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -806,8 +806,7 @@
     <dependencies>
 
         <!-- the logging dependencies are inherited by all modules for their generality
-            log4j and slf4j-log4j12 test scope only for UT/IT use
-        -->
+            log4j and slf4j-log4j12 test scope only for UT/IT use -->
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>


[12/21] kylin git commit: KYLIN-2489 upgrade zookeeper to 3.4.8

Posted by li...@apache.org.
KYLIN-2489 upgrade zookeeper to 3.4.8


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/98664f0d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/98664f0d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/98664f0d

Branch: refs/heads/KYLIN-2501
Commit: 98664f0decb0591ccc44692193145a1187201dfd
Parents: 0860048
Author: Li Yang <li...@apache.org>
Authored: Sun Mar 19 06:52:39 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Mar 19 06:52:39 2017 +0800

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/98664f0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 30506e5..9a56dde 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
         <avatica.version>1.9.0</avatica.version>
 
         <!-- Hadoop Common deps, keep compatible with hadoop2.version -->
-        <zookeeper.version>3.4.6</zookeeper.version>
+        <zookeeper.version>3.4.8</zookeeper.version>
         <curator.version>2.7.1</curator.version>
         <jsr305.version>3.0.1</jsr305.version>
         <guava.version>14.0</guava.version>


[21/21] kylin git commit: KYLIN-2501 bugfix, pass IT

Posted by li...@apache.org.
KYLIN-2501 bugfix, pass IT


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d045a045
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d045a045
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d045a045

Branch: refs/heads/KYLIN-2501
Commit: d045a045a51f587afb35a997a87e249ad1da4adb
Parents: 7e3c423
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 22 16:31:45 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 14:59:51 2017 +0800

----------------------------------------------------------------------
 .../gridtable/GTStreamAggregateScanner.java     |  24 +++--
 .../apache/kylin/storage/StorageContext.java    |  12 ---
 .../gtrecord/GTCubeStorageQueryBase.java        |   7 --
 .../storage/gtrecord/PartitionResultMerger.java | 100 -------------------
 .../gtrecord/SegmentCubeTupleIterator.java      |   7 +-
 .../SortMergedPartitionResultIterator.java      |  81 +++++++++++++++
 .../gtrecord/StorageResponseGTScatter.java      |  13 ++-
 7 files changed, 108 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 1fde423..4eb5791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.gridtable;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import org.apache.kylin.GTForwardingScanner;
@@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
     private final GTScanRequest req;
     private final Comparator<GTRecord> keyComparator;
 
-    public GTStreamAggregateScanner(IGTScanner delegated,
-            GTScanRequest req, Comparator<GTRecord> keyComparator) {
+    public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) {
         super(delegated);
-        this.req = req;
-        this.keyComparator = keyComparator;
+        this.req = Preconditions.checkNotNull(scanRequest, "scanRequest");
+        this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy());
     }
 
     @Override
@@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
     private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
 
         private int[] gtDimsIdx;
-        private int[] gtMetricsIdx;
+        private int[] gtMetricsIdx; // specify which metric to return and their order
+        private int[] aggIdx; // specify the ith returning metric's aggStates index
+
         private Object[] result; // avoid object creation
 
         StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
             super(input);
             this.gtDimsIdx = gtDimsIdx;
             this.gtMetricsIdx = gtMetricsIdx;
-            result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+            this.aggIdx = new int[gtMetricsIdx.length];
+            for (int i = 0; i < aggIdx.length; i++) {
+                int metricIdx = gtMetricsIdx[i];
+                aggIdx[i] = metrics.trueBitIndexOf(metricIdx);
+            }
+
+            this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
         }
 
         private void decodeAndSetDimensions(GTRecord record) {
@@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
         protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
             decodeAndSetDimensions(record);
             // set metrics
-            for (int i = 0; i < gtMetricsIdx.length; i++) {
-                result[gtDimsIdx.length + i] = aggStates[i];
+            for (int i = 0; i < aggIdx.length; i++) {
+                result[gtDimsIdx.length + i] = aggStates[aggIdx[i]];
             }
             return result;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index bb17054..4522261 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.storage;
 
-import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
@@ -49,9 +47,7 @@ public class StorageContext {
     private boolean exactAggregation = false;
     private boolean needStorageAggregation = false;
     private boolean enableCoprocessor = false;
-
     private boolean enableStreamAggregate = false;
-    private Comparator<GTRecord> groupKeyComparator;
 
     private IStorageQuery storageQuery;
     private AtomicLong processedRowCount = new AtomicLong();
@@ -242,12 +238,4 @@ public class StorageContext {
     public void enableStreamAggregate() {
         this.enableStreamAggregate = true;
     }
-
-    public Comparator<GTRecord> getGroupKeyComparator() {
-        return groupKeyComparator;
-    }
-
-    public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
-        this.groupKeyComparator = groupKeyComparator;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 82590a2..d91a0b4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,18 +26,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
 
         if (enabled) {
-            CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-            ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
-
             context.enableStreamAggregate();
-            context.setGroupKeyComparator(GTRecord.getComparator(cols));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
deleted file mode 100644
index 52029d3..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gtrecord;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-
-/**
- * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
- */
-public class PartitionResultMerger implements Iterable<GTRecord> {
-    private final ImmutableList<PartitionResultIterator> partitionResults;
-    private final GTInfo info;
-    private final Comparator<GTRecord> comparator;
-
-    public PartitionResultMerger(
-            Iterable<PartitionResultIterator> partitionResults,
-            GTInfo info, Comparator<GTRecord> comparator) {
-        this.partitionResults = ImmutableList.copyOf(partitionResults);
-        this.info = info;
-        this.comparator = comparator;
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        if (partitionResults.size() == 1) {
-            return partitionResults.get(0);
-        }
-        return new MergingResultsIterator();
-    }
-
-    private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
-        final GTRecord record = new GTRecord(info); // reuse to avoid object creation
-
-        PriorityQueue<PeekingIterator<GTRecord>> heap;
-
-        MergingResultsIterator() {
-            Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
-                public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
-                    return comparator.compare(o1.peek(), o2.peek());
-                }
-            };
-            this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
-
-            for (PartitionResultIterator it : partitionResults) {
-                if (it.hasNext()) {
-                    heap.offer(Iterators.peekingIterator(it));
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !heap.isEmpty();
-        }
-
-        @Override
-        public GTRecord next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            // get smallest record
-            PeekingIterator<GTRecord> it = heap.poll();
-            // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
-            // so we must make a shallow copy of it.
-            record.shallowCopyFrom(it.next());
-
-            if (it.hasNext()) {
-                heap.offer(it);
-            }
-
-            return record;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 11f766c..3bac5ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
             final Iterator<GTRecord> records, final GTScanRequest scanRequest,
             final int[] gtDimsIdx, final int[] gtMetricsIdx) {
 
-        boolean singlePartitionResult = records instanceof PartitionResultIterator;
-        if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+        boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator;
+        if (hasMultiplePartitions && context.isStreamAggregateEnabled()) {
             // input records are ordered, leverage stream aggregator to produce possibly fewer records
             IGTScanner inputScanner = new IGTScanner() {
                 public GTInfo getInfo() {
@@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
                     return records;
                 }
             };
-            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
-                    inputScanner, scanRequest, context.getGroupKeyComparator());
+            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest);
             return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
new file mode 100644
index 0000000..21e61e3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+
+    final GTRecord record ; // reuse to avoid object creation
+    PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+    SortMergedPartitionResultIterator(
+            List<PartitionResultIterator> partitionResults,
+            GTInfo info, final Comparator<GTRecord> comparator) {
+
+        this.record = new GTRecord(info);
+        Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+            public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+                return comparator.compare(o1.peek(), o2.peek());
+            }
+        };
+        this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+        for (PartitionResultIterator it : partitionResults) {
+            if (it.hasNext()) {
+                heap.offer(Iterators.peekingIterator(it));
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !heap.isEmpty();
+    }
+
+    @Override
+    public GTRecord next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        // get smallest record
+        PeekingIterator<GTRecord> it = heap.poll();
+        // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+        // so we must make a shallow copy of it.
+        record.shallowCopyFrom(it.next());
+
+        if (it.hasNext()) {
+            heap.offer(it);
+        }
+
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d045a045/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 0f1e191..f1ab20c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner {
     private IPartitionStreamer partitionStreamer;
     private final Iterator<byte[]> blocks;
     private final ImmutableBitSet columns;
-    private final StorageContext context;
+    private final ImmutableBitSet groupByDims;
     private final boolean needSorted; // whether scanner should return sorted records
 
     public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
@@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner {
         this.partitionStreamer = partitionStreamer;
         this.blocks = partitionStreamer.asByteArrayIterator();
         this.columns = scanRequest.getColumns();
-        this.context = context;
+        this.groupByDims = scanRequest.getAggrGroupBy();
         this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
     }
 
@@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner {
             partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
         }
 
+        if (partitionResults.size() == 1) {
+            return partitionResults.get(0);
+        }
+
         if (!needSorted) {
             logger.debug("Using Iterators.concat to merge partition results");
             return Iterators.concat(partitionResults.iterator());
         }
 
-        logger.debug("Using PartitionResultMerger to merge partition results");
-        PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
-        return merger.iterator();
+        logger.debug("Using SortMergedPartitionResultIterator to merge partition results");
+        return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
     }
 }


[08/21] kylin git commit: KYLIN-2514 handle disordered joins in data model

Posted by li...@apache.org.
KYLIN-2514 handle disordered joins in data model

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/14b96a86
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/14b96a86
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/14b96a86

Branch: refs/heads/KYLIN-2501
Commit: 14b96a86dc4cf6da78f2137838ca9827848f52be
Parents: 40249fe
Author: Roger Shi <ro...@hotmail.com>
Authored: Fri Mar 17 19:27:18 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 19:55:27 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/JoinsTree.java  | 43 +++++++++++++++++---
 .../model_desc/ci_left_join_model.json          | 28 ++++++-------
 2 files changed, 52 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/14b96a86/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index c7666cb..3c876a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -24,8 +24,12 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
 
 public class JoinsTree  implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -40,12 +44,41 @@ public class JoinsTree  implements Serializable {
                 Preconditions.checkState(col.isQualified());
         }
 
-        tableChains.put(rootTable.getAlias(), new Chain(rootTable, null, null));
+        // Walk through joins to build FK table to joins mapping
+        HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
+        int joinCount = 0;
+        for (JoinDesc join: joins) {
+            joinCount++;
+            String fkSideAlias = join.getFKSide().getAlias();
+            if (fkJoinMap.containsKey(fkSideAlias)) {
+                fkJoinMap.get(fkSideAlias).add(join);
+            } else {
+                List<JoinDesc> joinDescList = Lists.newArrayList(join);
+                fkJoinMap.put(fkSideAlias, joinDescList);
+            }
+        }
 
-        for (JoinDesc join : joins) {
-            TableRef pkSide = join.getPKSide();
-            Chain fkSide = tableChains.get(join.getFKSide().getAlias());
-            tableChains.put(pkSide.getAlias(), new Chain(pkSide, join, fkSide));
+        // Width-first build tree (tableChains)
+        Queue<Chain> chainBuff = Queues.newArrayDeque();
+        chainBuff.add(new Chain(rootTable, null, null));
+        int chainCount = 0;
+        while (!chainBuff.isEmpty()) {
+            Chain chain= chainBuff.poll();
+            String pkSideAlias = chain.table.getAlias();
+            chainCount++;
+            tableChains.put(pkSideAlias, chain);
+
+            // this round pk side is next round's fk side
+            if (fkJoinMap.containsKey(pkSideAlias)) {
+                for (JoinDesc join: fkJoinMap.get(pkSideAlias)) {
+                    chainBuff.add(new Chain(join.getPKSide(), join, chain));
+                }
+            }
+        }
+
+        // if join count not match (chain count - 1), there must be some join not take effect
+        if (joinCount != (chainCount - 1)) {
+            throw new IllegalArgumentException("There's some illegal Joins, please check your model");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/14b96a86/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json b/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
index 1b08aaf..bc5b444 100644
--- a/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
+++ b/examples/test_case_data/localmeta/model_desc/ci_left_join_model.json
@@ -31,20 +31,6 @@
       }
     },
     {
-      "table": "DEFAULT.TEST_ACCOUNT",
-      "alias": "SELLER_ACCOUNT",
-      "kind": "FACT",
-      "join": {
-        "type": "LEFT",
-        "primary_key": [
-          "SELLER_ACCOUNT.ACCOUNT_ID"
-        ],
-        "foreign_key": [
-          "TEST_KYLIN_FACT.SELLER_ID"
-        ]
-      }
-    },
-    {
       "table": "EDW.TEST_CAL_DT",
       "join": {
         "type": "LEFT",
@@ -119,6 +105,20 @@
           "SELLER_ACCOUNT.ACCOUNT_COUNTRY"
         ]
       }
+    },
+    {
+      "table": "DEFAULT.TEST_ACCOUNT",
+      "alias": "SELLER_ACCOUNT",
+      "kind": "FACT",
+      "join": {
+        "type": "LEFT",
+        "primary_key": [
+          "SELLER_ACCOUNT.ACCOUNT_ID"
+        ],
+        "foreign_key": [
+          "TEST_KYLIN_FACT.SELLER_ID"
+        ]
+      }
     }
   ],
   "dimensions": [


[20/21] kylin git commit: KYLIN-2501 Stream Aggregate GTRecords at Query Server

Posted by li...@apache.org.
KYLIN-2501 Stream Aggregate GTRecords at Query Server


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7e3c4234
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7e3c4234
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7e3c4234

Branch: refs/heads/KYLIN-2501
Commit: 7e3c4234e57f0f36c1c1829d7f97d67f8eeaa77b
Parents: 4c21821
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 15 22:45:02 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 14:59:51 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/common/util/ImmutableBitSet.java      |  29 ++-
 .../org/apache/kylin/GTForwardingScanner.java   |  56 +++++
 .../kylin/cube/gridtable/CubeGridTable.java     |  18 --
 .../gridtable/CuboidToGridTableMapping.java     |  18 ++
 .../cube/inmemcubing/InMemCubeBuilder.java      |   6 +-
 .../kylin/gridtable/GTAggregateScanner.java     |  16 +-
 .../apache/kylin/gridtable/GTFilterScanner.java |  22 +-
 .../org/apache/kylin/gridtable/GTRecord.java    |  80 +++----
 .../apache/kylin/gridtable/GTScanRequest.java   |  13 ++
 .../gridtable/GTStreamAggregateScanner.java     | 211 +++++++++++++++++++
 .../kylin/gridtable/GTScanReqSerDerTest.java    |   4 +-
 .../apache/kylin/storage/StorageContext.java    |  20 ++
 .../storage/gtrecord/CubeScanRangePlanner.java  |   3 +-
 .../storage/gtrecord/CubeSegmentScanner.java    |   7 +-
 .../storage/gtrecord/CubeTupleConverter.java    |  31 +--
 .../gtrecord/GTCubeStorageQueryBase.java        |  38 +++-
 .../kylin/storage/gtrecord/ITupleConverter.java |   3 +-
 .../gtrecord/PartitionResultIterator.java       |  59 ++++++
 .../storage/gtrecord/PartitionResultMerger.java | 100 +++++++++
 .../kylin/storage/gtrecord/ScannerWorker.java   |   5 +-
 .../gtrecord/SegmentCubeTupleIterator.java      |  72 ++++++-
 .../gtrecord/StorageResponseGTScatter.java      |  82 +++----
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   7 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   5 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   5 +-
 26 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 02349ad..9cd35c8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true"));
     }
 
+    public boolean isStreamAggregateEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
+    }
+
     @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold 
     public int getStoragePushDownLimitMax() {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index b417877..5cdf08c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -19,8 +19,9 @@ package org.apache.kylin.common.util;
 
 import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Iterator;
 
-public class ImmutableBitSet {
+public class ImmutableBitSet implements Iterable<Integer> {
 
     public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
 
@@ -168,4 +169,30 @@ public class ImmutableBitSet {
             return new ImmutableBitSet(bitSet);
         }
     };
+
+    /**
+     * Iterate over the positions of true value.
+     * @return the iterator
+     */
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            int index = 0;
+
+            @Override
+            public boolean hasNext() {
+                return index < arr.length;
+            }
+
+            @Override
+            public Integer next() {
+                return arr[index++];
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
new file mode 100644
index 0000000..de8c88d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link IGTScanner} which forwards all its method calls to another scanner.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ */
+public class GTForwardingScanner implements IGTScanner {
+    protected IGTScanner delegated;
+
+    protected GTForwardingScanner(IGTScanner delegated) {
+        this.delegated = checkNotNull(delegated, "delegated");
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return delegated.getInfo();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegated.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return delegated.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 563cf43..5cee9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -18,29 +18,11 @@
 
 package org.apache.kylin.cube.gridtable;
 
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.IDimensionEncodingMap;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
 
 public class CubeGridTable {
-
-    public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
-        Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
-        return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg));
-    }
-
-    public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-        return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap));
-    }
-
     public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) {
         CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 2e5dd12..6879687 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -140,11 +140,29 @@ public class CuboidToGridTableMapping {
         return i == null ? -1 : i.intValue();
     }
 
+    public int[] getDimIndexes(Collection<TblColRef> dims) {
+        int[] result = new int[dims.size()];
+        int i = 0;
+        for (TblColRef dim : dims) {
+            result[i++] = getIndexOf(dim);
+        }
+        return result;
+    }
+
     public int getIndexOf(FunctionDesc metric) {
         Integer r = metrics2gt.get(metric);
         return r == null ? -1 : r;
     }
 
+    public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) {
+        int[] result = new int[metrics.size()];
+        int i = 0;
+        for (FunctionDesc metric : metrics) {
+            result[i++] = getIndexOf(metric);
+        }
+        return result;
+    }
+
     public List<TblColRef> getCuboidDimensionsInGTOrder() {
         return cuboid.getColumns();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e08844e..a26e948 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTAggregateScanner;
 import org.apache.kylin.gridtable.GTBuilder;
@@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
-        GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+        GTInfo info = CubeGridTable.newGTInfo(
+                Cuboid.findById(cubeDesc, cuboidID),
+                new CubeDimEncMap(cubeDesc, dictionaryMap)
+        );
 
         // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
         // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7cdd4f5..0dd6fa9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner {
     final ImmutableBitSet metrics;
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
+    final BufferedMeasureCodec measureCodec;
     final AggregationCache aggrCache;
     final long spillThreshold; // 0 means no memory control && no spill
     final int storagePushDownLimit;//default to be Int.MAX
@@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner {
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
+        this.measureCodec = req.createMeasureCodec();
         this.aggrCache = new AggregationCache();
         this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
         this.aggrMask = new boolean[metricsAggrFuncs.length];
@@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner {
         final int keyLength;
         final boolean[] compareMask;
         boolean compareAll = true;
-        final BufferedMeasureCodec measureCodec;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
@@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner {
             keyLength = compareMask.length;
             dumps = Lists.newArrayList();
             aggBufMap = createBuffMap();
-            measureCodec = createMeasureCodec();
-        }
-
-        private BufferedMeasureCodec createMeasureCodec() {
-            DataType[] types = new DataType[metrics.trueBitCount()];
-            for (int i = 0; i < types.length; i++) {
-                types[i] = info.getColumnType(metrics.trueBitAt(i));
-            }
-
-            BufferedMeasureCodec result = new BufferedMeasureCodec(types);
-            result.setBufferSize(info.getMaxColumnLength(metrics));
-            return result;
         }
 
         private boolean[] createCompareMask() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 717f89c..cad0a04 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+import org.apache.kylin.GTForwardingScanner;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
-public class GTFilterScanner implements IGTScanner {
+public class GTFilterScanner extends GTForwardingScanner {
 
-    final private IGTScanner inputScanner;
     final private TupleFilter filter;
     final private IFilterCodeSystem<ByteArray> filterCodeSystem;
     final private IEvaluatableTuple oneTuple; // avoid instance creation
 
     private GTRecord next = null;
 
-    public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
-        this.inputScanner = inputScanner;
+    public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException {
+        super(delegated);
         this.filter = req.getFilterPushDown();
         this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
         this.oneTuple = new IEvaluatableTuple() {
@@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner {
             }
         };
 
-        if (TupleFilter.isEvaluableRecursively(filter) == false)
+        if (!TupleFilter.isEvaluableRecursively(filter))
             throw new IllegalArgumentException();
     }
 
     @Override
-    public GTInfo getInfo() {
-        return inputScanner.getInfo();
-    }
-
-    @Override
-    public void close() throws IOException {
-        inputScanner.close();
-    }
-
-    @Override
     public Iterator<GTRecord> iterator() {
         return new Iterator<GTRecord>() {
 
-            private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+            private Iterator<GTRecord> inputIterator = delegated.iterator();
             private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter);
 
             @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index f4480c8..3397adc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -21,7 +21,6 @@ package org.apache.kylin.gridtable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -46,18 +45,21 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         }
         this.info = info;
     }
-
-    public GTRecord(GTRecord other) {
-        this.info = other.info;
-        this.cols = new ByteArray[info.getColumnCount()];
-        for (int i = 0; i < other.cols.length; i++) {
-            this.cols[i] = other.cols[i].copy();
+    
+    @Override
+    public GTRecord clone() { // deep copy
+        ByteArray[] cols = new ByteArray[this.cols.length];
+        for (int i = 0; i < cols.length; i++) {
+            cols[i] = this.cols[i].copy();
         }
+        return new GTRecord(this.info, cols);
     }
 
-    @Override
-    public Object clone() {
-        return new GTRecord(this);
+    public void shallowCopyFrom(GTRecord source) {
+        assert info == source.info;
+        for (int i = 0; i < cols.length; i++) {
+            cols[i].set(source.cols[i]);
+        }
     }
 
     public GTInfo getInfo() {
@@ -106,30 +108,18 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
     /** decode and return the values of this record */
     public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
         assert selectedCols.cardinality() == result.length;
-
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-            if (cols[c] == null || cols[c].array() == null) {
-                result[i] = null;
-            } else {
-                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
-            }
+            result[i] = decodeValue(selectedCols.trueBitAt(i));
         }
         return result;
     }
 
-    /** decode and return the values of this record */
-    public Object[] getValues(int[] selectedColumns, Object[] result) {
-        assert selectedColumns.length <= result.length;
-        for (int i = 0; i < selectedColumns.length; i++) {
-            int c = selectedColumns[i];
-            if (cols[c].array() == null) {
-                result[i] = null;
-            } else {
-                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
-            }
+    public Object decodeValue(int c) {
+        ByteArray col = cols[c];
+        if (col != null && col.array() != null) {
+            return info.codeSystem.decodeColumnValue(c, col.asBuffer());
         }
-        return result;
+        return null;
     }
 
     public int sizeOf(ImmutableBitSet selectedCols) {
@@ -198,19 +188,13 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         return compareToInternal(o, info.colAll);
     }
 
-    public int compareToOnPrimaryKey(GTRecord o) {
-        return compareToInternal(o, info.primaryKey);
-    }
-
-    public static Comparator<GTRecord> getPrimaryKeyComparator() {
+    public static Comparator<GTRecord> getComparator(final ImmutableBitSet participateCols) {
         return new Comparator<GTRecord>() {
-            @Override
             public int compare(GTRecord o1, GTRecord o2) {
                 if (o1 == null || o2 == null) {
                     throw new IllegalStateException("Cannot handle null");
                 }
-
-                return o1.compareToOnPrimaryKey(o2);
+                return o1.compareToInternal(o2, participateCols);
             }
         };
     }
@@ -287,26 +271,14 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         loadColumns(info.colBlocks[c], buf);
     }
 
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
-        int pos = buf.position();
-        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-            int len = info.codeSystem.codeLength(c, buf);
-            cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
-            pos += len;
-            buf.position(pos);
-        }
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize
-     *  unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
-     *  method allows to defined specific columns(in order) to load
+    /**
+     * Change pointers to point to data in given buffer, UNLIKE deserialize
+     * @param selectedCols positions of column to load
+     * @param buf buffer containing continuous data of selected columns
      */
-    public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+    public void loadColumns(Iterable<Integer> selectedCols, ByteBuffer buf) {
         int pos = buf.position();
-        for (int i = 0; i < selectedCols.size(); i++) {
-            int c = selectedCols.get(i);
+        for (int c : selectedCols) {
             int len = info.codeSystem.codeLength(c, buf);
             cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
             pos += len;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4629c8e..ae35d2b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.SerializeToByteBuffer;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -202,6 +204,17 @@ public class GTScanRequest {
 
     }
 
+    public BufferedMeasureCodec createMeasureCodec() {
+        DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()];
+        for (int i = 0; i < metricTypes.length; i++) {
+            metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i));
+        }
+
+        BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes);
+        codec.setBufferSize(info.getMaxColumnLength(aggrMetrics));
+        return codec;
+    }
+
     public boolean isDoingStorageAggregation() {
         return doingStorageAggregation;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
new file mode 100644
index 0000000..1fde423
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.kylin.GTForwardingScanner;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * GTStreamAggregateScanner requires input records to be sorted on group fields.
+ * In such cases, it's superior to hash/sort based aggregator because it can produce
+ * ordered outputs on the fly and the memory consumption is very low.
+ */
+public class GTStreamAggregateScanner extends GTForwardingScanner {
+    private final GTScanRequest req;
+    private final Comparator<GTRecord> keyComparator;
+
+    public GTStreamAggregateScanner(IGTScanner delegated,
+            GTScanRequest req, Comparator<GTRecord> keyComparator) {
+        super(delegated);
+        this.req = req;
+        this.keyComparator = keyComparator;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new StreamMergeGTRecordIterator(delegated.iterator());
+    }
+
+    public Iterator<Object[]> valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) {
+        return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx);
+    }
+
+    private abstract class AbstractStreamMergeIterator<E> implements Iterator<E> {
+        final PeekingIterator<GTRecord> input;
+        final IGTCodeSystem codeSystem;
+        final ImmutableBitSet dimensions;
+        final ImmutableBitSet metrics;
+        final String[] metricFuncs;
+        final BufferedMeasureCodec measureCodec;
+
+        private final GTRecord first; // reuse to avoid object creation
+
+        AbstractStreamMergeIterator(Iterator<GTRecord> input) {
+            this.input = Iterators.peekingIterator(input);
+            this.codeSystem = req.getInfo().getCodeSystem();
+            this.dimensions = req.getDimensions();
+            this.metrics = req.getAggrMetrics();
+            this.metricFuncs = req.getAggrMetricsFuncs();
+            this.measureCodec = req.createMeasureCodec();
+
+            this.first = new GTRecord(req.getInfo());
+        }
+
+        @Override
+        public boolean hasNext() {
+            return input.hasNext();
+        }
+
+        private boolean isSameKey(GTRecord o1, GTRecord o2) {
+            return keyComparator.compare(o1, o2) == 0;
+        }
+
+        private boolean shouldMergeNext(GTRecord current) {
+            return input.hasNext() && isSameKey(current, input.peek());
+        }
+
+        protected abstract E finalizeResult(GTRecord record);
+
+        protected abstract E finalizeResult(GTRecord record, Object[] aggStates);
+
+        @Override
+        public E next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            // WATCH OUT! record returned by "input" scanner could be changed later,
+            // so we must make a shallow copy of it.
+            first.shallowCopyFrom(input.next());
+
+            // shortcut to avoid extra deserialize/serialize cost
+            if (!shouldMergeNext(first)) {
+                return finalizeResult(first);
+            }
+            // merge records with the same key
+            MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs);
+            aggregate(aggrs, first);
+            aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later
+            while (shouldMergeNext(first)) {
+                aggregate(aggrs, input.next());
+            }
+
+            Object[] aggStates = new Object[aggrs.length];
+            for (int i = 0; i < aggStates.length; i++) {
+                aggStates[i] = aggrs[i].getState();
+            }
+            return finalizeResult(first, aggStates);
+        }
+
+        @SuppressWarnings("unchecked")
+        protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) {
+            for (int i = 0; i < aggregators.length; i++) {
+                int c = metrics.trueBitAt(i);
+                Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer());
+                aggregators[i].aggregate(metric);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove");
+        }
+    }
+
+    private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator<GTRecord> {
+
+        private GTRecord returnRecord; // avoid object creation
+
+        StreamMergeGTRecordIterator(Iterator<GTRecord> input) {
+            super(input);
+            this.returnRecord = new GTRecord(req.getInfo());
+        }
+
+        @Override
+        protected GTRecord finalizeResult(GTRecord record) {
+            return record;
+        }
+
+        @Override
+        protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) {
+            // 1. load dimensions
+            for (int c : dimensions) {
+                returnRecord.cols[c] = record.cols[c];
+            }
+            // 2. serialize metrics
+            byte[] bytes = measureCodec.encode(aggStates).array();
+            int[] sizes = measureCodec.getMeasureSizes();
+            // 3. load metrics
+            int offset = 0;
+            for (int i = 0; i < metrics.trueBitCount(); i++) {
+                int c = metrics.trueBitAt(i);
+                returnRecord.cols[c].set(bytes, offset, sizes[i]);
+                offset += sizes[i];
+            }
+            return returnRecord;
+        }
+    }
+
+    private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
+
+        private int[] gtDimsIdx;
+        private int[] gtMetricsIdx;
+        private Object[] result; // avoid object creation
+
+        StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
+            super(input);
+            this.gtDimsIdx = gtDimsIdx;
+            this.gtMetricsIdx = gtMetricsIdx;
+            result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+        }
+
+        private void decodeAndSetDimensions(GTRecord record) {
+            for (int i = 0; i < gtDimsIdx.length; i++) {
+                result[i] = record.decodeValue(gtDimsIdx[i]);
+            }
+        }
+
+        @Override
+        protected Object[] finalizeResult(GTRecord record) {
+            decodeAndSetDimensions(record);
+            // decode metrics
+            for (int i = 0; i < gtMetricsIdx.length; i++) {
+                result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+            }
+            return result;
+        }
+
+        @Override
+        protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
+            decodeAndSetDimensions(record);
+            // set metrics
+            for (int i = 0; i < gtMetricsIdx.length; i++) {
+                result[gtDimsIdx.length + i] = aggStates[i];
+            }
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
index 77cc2d8..1ae229a 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase {
         CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready");
         CubeSegment segment = cube.getFirstSegment();
 
-        GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor()));
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor());
+        GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment));
         GTInfo.serializer.serialize(info, buffer);
         buffer.flip();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 998f1db..bb17054 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.storage;
 
+import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
@@ -48,6 +50,9 @@ public class StorageContext {
     private boolean needStorageAggregation = false;
     private boolean enableCoprocessor = false;
 
+    private boolean enableStreamAggregate = false;
+    private Comparator<GTRecord> groupKeyComparator;
+
     private IStorageQuery storageQuery;
     private AtomicLong processedRowCount = new AtomicLong();
     private Cuboid cuboid;
@@ -230,4 +235,19 @@ public class StorageContext {
         this.storageQuery = storageQuery;
     }
 
+    public boolean isStreamAggregateEnabled() {
+        return enableStreamAggregate;
+    }
+
+    public void enableStreamAggregate() {
+        this.enableStreamAggregate = true;
+    }
+
+    public Comparator<GTRecord> getGroupKeyComparator() {
+        return groupKeyComparator;
+    }
+
+    public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
+        this.groupKeyComparator = groupKeyComparator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 6911827..c3cc858 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.RecordComparators;
 import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
 import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
@@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         Set<TblColRef> filterDims = Sets.newHashSet();
         TupleFilter.collectColumns(filter, filterDims);
 
-        this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+        this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment));
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
 
         IGTComparator comp = gtInfo.getCodeSystem().getComparator();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 4f206d4..31a9f99 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
         }
         scanRequest = scanRangePlanner.planScanRequest();
         String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
-        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
     }
 
     @Override
@@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner {
         return scanRequest == null ? null : scanRequest.getInfo();
     }
 
-    public CubeSegment getSegment() {
-        return this.cubeSeg;
+    public GTScanRequest getScanRequest() {
+        return scanRequest;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 280718f..b762e5c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -43,7 +41,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
- * convert GTRecord to tuple
+ * Convert Object[] (decoded GTRecord) to tuple
  */
 public class CubeTupleConverter implements ITupleConverter {
 
@@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter {
 
     private final int[] gtColIdx;
     private final int[] tupleIdx;
-    private final Object[] gtValues;
     private final MeasureType<?>[] measureTypes;
 
     private final List<IAdvMeasureFiller> advMeasureFillers;
@@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter {
     private final int nSelectedDims;
 
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx,
+                              TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
+        this.gtColIdx = gtColIdx;
         this.tupleInfo = returnTupleInfo;
         this.derivedColFillers = Lists.newArrayList();
 
-        List<TblColRef> cuboidDims = cuboid.getColumns();
-        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
         nSelectedDims = selectedDimensions.size();
-        gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
         tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
-        gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
 
         // measure types don't have this many, but aligned length make programming easier
         measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
@@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter {
 
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : selectedDimensions) {
-            int dimIndex = mapping.getIndexOf(dim);
-            gtColIdx[i] = dimIndex;
             tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-
-            //            if (tupleIdx[iii] == -1) {
-            //                throw new IllegalStateException("dim not used in tuple:" + dim);
-            //            }
-
             i++;
         }
 
         for (FunctionDesc metric : selectedMetrics) {
-            int metricIndex = mapping.getIndexOf(metric);
-            gtColIdx[i] = metricIndex;
-
             if (metric.needRewrite()) {
                 String rewriteFieldName = metric.getRewriteFieldName();
                 tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
@@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter {
         }
 
         // prepare derived columns and filler
-        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
             TblColRef[] hostCols = entry.getKey().data;
             for (DeriveInfo deriveInfo : entry.getValue()) {
@@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter {
     }
 
     @Override
-    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
-        record.getValues(gtColIdx, gtValues);
+    public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple) {
+        assert gtValues.length == gtColIdx.length;
 
         // dimensions
         for (int i = 0; i < nSelectedDims; i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ecf1ad3..82590a2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,15 +26,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -120,6 +123,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
         // set limit push down
         enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
+        // set whether to aggregate results from multiple partitions
+        enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // set query deadline
         context.setDeadline(cubeInstance);
 
@@ -144,8 +149,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
     protected abstract String getGTStorage();
     
-    protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) {
-        return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+    protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) {
+        return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
     }
 
     protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -366,6 +371,35 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
+    private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) {
+        CubeDesc cubeDesc = cuboid.getCubeDesc();
+        boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled();
+
+        Set<TblColRef> shardByInGroups = Sets.newHashSet();
+        for (TblColRef col : cubeDesc.getShardByColumns()) {
+            if (groupsD.contains(col)) {
+                shardByInGroups.add(col);
+            }
+        }
+        if (!shardByInGroups.isEmpty()) {
+            enabled = false;
+            logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups);
+        }
+
+        if (!context.isNeedStorageAggregation()) {
+            enabled = false;
+            logger.debug("Aggregate partition results is not beneficial because no storage aggregation");
+        }
+
+        if (enabled) {
+            CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+            ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
+
+            context.enableStreamAggregate();
+            context.setGroupKeyComparator(GTRecord.getComparator(cols));
+        }
+    }
+
     protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
         Map<String, List<MeasureDesc>> map = Maps.newHashMap();
         for (MeasureDesc measure : cubeDesc.getMeasures()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index 9c50d0c..dd48e4d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord;
 
 import java.util.List;
 
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.tuple.Tuple;
 
 public interface ITupleConverter {
 
-    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple);
+    public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
new file mode 100644
index 0000000..474e1e0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+/**
+ * Support iterate over {@code GTRecord}s in storage partition result.
+ *
+ * <p>Note that the implementation returns the same object for next().
+ * Client needs to copy the returned record when needed.
+ */
+public class PartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+    private final ByteBuffer buffer;
+    private final ImmutableBitSet cols;
+    private final GTRecord record; // reuse to avoid object creation
+
+    public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) {
+        this.buffer = ByteBuffer.wrap(data);
+        this.cols = cols;
+        this.record = new GTRecord(info);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return buffer.hasRemaining();
+    }
+
+    @Override
+    public GTRecord next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        record.loadColumns(cols, buffer);
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
new file mode 100644
index 0000000..52029d3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class PartitionResultMerger implements Iterable<GTRecord> {
+    private final ImmutableList<PartitionResultIterator> partitionResults;
+    private final GTInfo info;
+    private final Comparator<GTRecord> comparator;
+
+    public PartitionResultMerger(
+            Iterable<PartitionResultIterator> partitionResults,
+            GTInfo info, Comparator<GTRecord> comparator) {
+        this.partitionResults = ImmutableList.copyOf(partitionResults);
+        this.info = info;
+        this.comparator = comparator;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        if (partitionResults.size() == 1) {
+            return partitionResults.get(0);
+        }
+        return new MergingResultsIterator();
+    }
+
+    private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
+        final GTRecord record = new GTRecord(info); // reuse to avoid object creation
+
+        PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+        MergingResultsIterator() {
+            Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+                public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+                    return comparator.compare(o1.peek(), o2.peek());
+                }
+            };
+            this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+            for (PartitionResultIterator it : partitionResults) {
+                if (it.hasNext()) {
+                    heap.offer(Iterators.peekingIterator(it));
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !heap.isEmpty();
+        }
+
+        @Override
+        public GTRecord next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            // get smallest record
+            PeekingIterator<GTRecord> it = heap.poll();
+            // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+            // so we must make a shallow copy of it.
+            record.shallowCopyFrom(it.next());
+
+            if (it.hasNext()) {
+                heap.offer(it);
+            }
+
+            return record;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index 9e89227..fe22e9c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStorage;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ public class ScannerWorker {
     private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
     private IGTScanner internal = null;
 
-    public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
+    public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
         if (scanRequest == null) {
             logger.info("Segment {} will be skipped", segment);
             internal = new EmptyGTScanner();
@@ -48,7 +49,7 @@ public class ScannerWorker {
         final GTInfo info = scanRequest.getInfo();
 
         try {
-            IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
+            IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
             internal = rpc.getGTScanner(scanRequest);
         } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 37699a3..11f766c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -24,8 +24,14 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+import com.google.common.collect.UnmodifiableIterator;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTStreamAggregateScanner;
+import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
     protected final Tuple tuple;
     protected final StorageContext context;
 
-    protected Iterator<GTRecord> gtItr;
+    protected Iterator<Object[]> gtValues;
     protected ITupleConverter cubeTupleConverter;
     protected Tuple next;
 
@@ -66,12 +72,62 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
         this.tupleInfo = returnTupleInfo;
         this.tuple = new Tuple(returnTupleInfo);
         this.context = context;
-        this.gtItr = getGTItr(scanner);
-        this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+        int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions);
+        int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics);
+        // gtColIdx = gtDimsIdx + gtMetricsIdx
+        int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length];
+        System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length);
+        System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length);
+
+        this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx);
+        this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(
+                scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
     }
 
-    private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
-        return scanner.iterator();
+    private Iterator<Object[]> getGTValuesIterator(
+            final Iterator<GTRecord> records, final GTScanRequest scanRequest,
+            final int[] gtDimsIdx, final int[] gtMetricsIdx) {
+
+        boolean singlePartitionResult = records instanceof PartitionResultIterator;
+        if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+            // input records are ordered, leverage stream aggregator to produce possibly fewer records
+            IGTScanner inputScanner = new IGTScanner() {
+                public GTInfo getInfo() {
+                    return scanRequest.getInfo();
+                }
+
+                public void close() throws IOException {}
+
+                public Iterator<GTRecord> iterator() {
+                    return records;
+                }
+            };
+            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
+                    inputScanner, scanRequest, context.getGroupKeyComparator());
+            return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
+        }
+
+        // simply decode records
+        return new UnmodifiableIterator<Object[]>() {
+            Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+
+            public boolean hasNext() {
+                return records.hasNext();
+            }
+
+            public Object[] next() {
+                GTRecord record = records.next();
+                for (int i = 0; i < gtDimsIdx.length; i++) {
+                    result[i] = record.decodeValue(gtDimsIdx[i]);
+                }
+                for (int i = 0; i < gtMetricsIdx.length; i++) {
+                    result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+                }
+                return result;
+            }
+        };
     }
 
     @Override
@@ -91,13 +147,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
         }
 
         // now we have a GTRecord
-        if (!gtItr.hasNext()) {
+        if (!gtValues.hasNext()) {
             return false;
         }
-        GTRecord curRecord = gtItr.next();
+        Object[] gtValues = this.gtValues.next();
 
         // translate into tuple
-        advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+        advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple);
 
         // the simple case
         if (advMeasureFillers == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 1a80bbf..0f1e191 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -18,22 +18,20 @@
 
 package org.apache.kylin.storage.gtrecord;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * scatter the blob returned from region server to a iterable of gtrecords
@@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner {
 
     private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
 
-    private GTInfo info;
+    private final GTInfo info;
     private IPartitionStreamer partitionStreamer;
-    private Iterator<byte[]> blocks;
-    private ImmutableBitSet columns;
-    private int storagePushDownLimit = -1;
+    private final Iterator<byte[]> blocks;
+    private final ImmutableBitSet columns;
+    private final StorageContext context;
+    private final boolean needSorted; // whether scanner should return sorted records
 
-    public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) {
-        this.info = info;
+    public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
+        this.info = scanRequest.getInfo();
         this.partitionStreamer = partitionStreamer;
         this.blocks = partitionStreamer.asByteArrayIterator();
-        this.columns = columns;
-        this.storagePushDownLimit = storagePushDownLimit;
+        this.columns = scanRequest.getColumns();
+        this.context = context;
+        this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
     }
 
     @Override
@@ -69,48 +69,18 @@ public class StorageResponseGTScatter implements IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
-        if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
-            logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
-            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
-        } else {
-            logger.info("Using Iterators.concat to merge partition results");
-            return Iterators.concat(shardSubsets);
+        List<PartitionResultIterator> partitionResults = Lists.newArrayList();
+        while (blocks.hasNext()) {
+            partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
         }
-    }
-
-    class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
-        @Nullable
-        @Override
-        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
-            return new Iterator<GTRecord>() {
-                private ByteBuffer inputBuffer = null;
-                //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
-                private GTRecord firstRecord = null;
-
-                @Override
-                public boolean hasNext() {
-                    if (inputBuffer == null) {
-                        inputBuffer = ByteBuffer.wrap(input);
-                        firstRecord = new GTRecord(info);
-                    }
 
-                    return inputBuffer.position() < inputBuffer.limit();
-                }
-
-                @Override
-                public GTRecord next() {
-                    firstRecord.loadColumns(columns, inputBuffer);
-                    return firstRecord;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
+        if (!needSorted) {
+            logger.debug("Using Iterators.concat to merge partition results");
+            return Iterators.concat(partitionResults.iterator());
         }
-    }
 
+        logger.debug("Using PartitionResultMerger to merge partition results");
+        PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
+        return merger.iterator();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 82b67b6..e822ada 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new LoggableCachedThreadPool();
 
-    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
-        super(segment, cuboid, fullGTInfo);
+    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+        super(segment, cuboid, fullGTInfo, context);
     }
 
     private byte[] getByteArrayForShort(short v) {
@@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
         }
 
-        return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit());
+        return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
     }
 
     private ByteString serializeGTScanReq(GTScanRequest scanRequest) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 88e7176..db81646 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.IGTStorage;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage {
     final protected Cuboid cuboid;
     final protected GTInfo fullGTInfo;
     final protected QueryContext queryContext;
+    final protected StorageContext storageContext;
 
     final private RowKeyEncoder fuzzyKeyEncoder;
     final private RowKeyEncoder fuzzyMaskEncoder;
 
-    public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+    public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
         Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
         
         this.cubeSeg = (CubeSegment) segment;
         this.cuboid = cuboid;
         this.fullGTInfo = fullGTInfo;
         this.queryContext = QueryContext.current();
+        this.storageContext = context;
 
         this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
         this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7e3c4234/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 33f8d90..951e2ef 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         }
     }
 
-    public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
-        super(segment, cuboid, fullGTInfo);
+    public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
+        super(segment, cuboid, fullGTInfo, context);
     }
 
     @Override


[04/21] kylin git commit: KYLIN-2510 Fix unintended NPE in CubeMetaExtractor

Posted by li...@apache.org.
KYLIN-2510 Fix unintended NPE in CubeMetaExtractor


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8be842e9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8be842e9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8be842e9

Branch: refs/heads/KYLIN-2501
Commit: 8be842e9b845477541c2569ae1c2484e9d627214
Parents: dde297e
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 09:44:47 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 09:44:54 2017 +0800

----------------------------------------------------------------------
 tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8be842e9/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index e370e48..188524d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -65,6 +65,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -172,6 +173,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
             String projectNames = optionsHelper.getOptionValue(OPTION_PROJECT);
             for (String projectName : projectNames.split(",")) {
                 ProjectInstance projectInstance = projectManager.getProject(projectName);
+                Preconditions.checkNotNull(projectInstance, "Project " + projectName + " does not exist.");
                 requireProject(projectInstance);
             }
         } else if (optionsHelper.hasOption(OPTION_CUBE)) {
@@ -202,9 +204,6 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
     }
 
     private void requireProject(ProjectInstance projectInstance) throws IOException {
-        if (projectInstance == null) {
-            throw new IllegalArgumentException("Project " + projectInstance.getName() + " does not exist");
-        }
         addRequired(projectInstance.getResourcePath());
         List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
         for (RealizationEntry realizationEntry : realizationEntries) {


[19/21] kylin git commit: KYLIN-2518 Optimize put row key to hll

Posted by li...@apache.org.
KYLIN-2518 Optimize put row key to hll

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c218214
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c218214
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c218214

Branch: refs/heads/KYLIN-2501
Commit: 4c21821471cb261cfecdf8289c5f8284af817b3e
Parents: b1cc0dd
Author: xiefan46 <95...@qq.com>
Authored: Mon Mar 27 18:13:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 29 11:01:24 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/measure/hllc/HLLCounter.java   |  54 ++--
 .../mr/steps/FactDistinctColumnsMapper.java     |  31 +-
 .../mr/steps/NewCubeSamplingMethodTest.java     | 299 +++++++++++++++++++
 3 files changed, 341 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index 82c881b..b793465 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -60,7 +60,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         merge(another);
     }
 
-    HLLCounter(int p, RegisterType type) {
+    public HLLCounter(int p, RegisterType type) {
         this(p, type, Hashing.murmur3_128());
     }
 
@@ -99,6 +99,10 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         add(hashFunc.hashBytes(value, offset, length).asLong());
     }
 
+    public void addHashDirectly(long hash){
+        add(hash);
+    }
+
     protected void add(long hash) {
         int bucketMask = m - 1;
         int bucket = (int) (hash & bucketMask);
@@ -126,7 +130,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
     }
 
     private void toDenseIfNeeded() {
-        if (register instanceof SparseRegister) {
+        if (register.getRegisterType() == RegisterType.SPARSE) {
             if (isDense(register.getSize())) {
                 register = ((SparseRegister) register).toDense(p);
             }
@@ -137,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         assert this.p == another.p;
         assert this.hashFunc == another.hashFunc;
         switch (register.getRegisterType()) {
-        case SINGLE_VALUE:
-            switch (another.getRegisterType()) {
             case SINGLE_VALUE:
-                if (register.getSize() > 0 && another.register.getSize() > 0) {
-                    register = ((SingleValueRegister) register).toSparse();
-                } else {
-                    SingleValueRegister sr = (SingleValueRegister) another.register;
-                    if (sr.getSize() > 0)
-                        register.set(sr.getSingleValuePos(), sr.getValue());
-                    return;
+                switch (another.getRegisterType()) {
+                    case SINGLE_VALUE:
+                        if (register.getSize() > 0 && another.register.getSize() > 0) {
+                            register = ((SingleValueRegister) register).toSparse();
+                        } else {
+                            SingleValueRegister sr = (SingleValueRegister) another.register;
+                            if (sr.getSize() > 0)
+                                register.set(sr.getSingleValuePos(), sr.getValue());
+                            return;
+                        }
+                        break;
+                    case SPARSE:
+                        register = ((SingleValueRegister) register).toSparse();
+                        break;
+                    case DENSE:
+                        register = ((SingleValueRegister) register).toDense(this.p);
+                        break;
+                    default:
+                        break;
                 }
+
                 break;
             case SPARSE:
-                register = ((SingleValueRegister) register).toSparse();
-                break;
-            case DENSE:
-                register = ((SingleValueRegister) register).toDense(this.p);
+                if (another.getRegisterType() == RegisterType.DENSE) {
+                    register = ((SparseRegister) register).toDense(p);
+                }
                 break;
             default:
                 break;
-            }
-
-            break;
-        case SPARSE:
-            if (another.getRegisterType() == RegisterType.DENSE) {
-                register = ((SparseRegister) register).toDense(p);
-            }
-            break;
-        default:
-            break;
         }
         register.merge(another.register);
         toDenseIfNeeded();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 9f65163..e6cea2b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -24,13 +24,13 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -62,7 +62,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     private HashFunction hf = null;
     private int rowCount = 0;
     private int samplingPercentage;
-    private ByteArray[] row_hashcodes = null;
+    //private ByteArray[] row_hashcodes = null;
+    private long[] rowHashCodesLong = null;
     private ByteBuffer tmpbuf;
     private static final Text EMPTY_TEXT = new Text();
     public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
@@ -92,14 +93,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
             allCuboidsHLL = new HLLCounter[cuboidIds.length];
             for (int i = 0; i < cuboidIds.length; i++) {
-                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
             }
 
-            hf = Hashing.murmur3_32();
-            row_hashcodes = new ByteArray[nRowKey];
-            for (int i = 0; i < nRowKey; i++) {
-                row_hashcodes[i] = new ByteArray();
-            }
+            hf = Hashing.murmur3_128();
+            rowHashCodesLong = new long[nRowKey];
 
             TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             if (partitionColRef != null) {
@@ -211,26 +209,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     }
 
     private void putRowKeyToHLL(String[] row) {
-
         //generate hash for each row key column
         for (int i = 0; i < nRowKey; i++) {
             Hasher hc = hf.newHasher();
             String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue != null) {
-                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
-            } else {
-                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-            }
+            if (colValue == null)
+                colValue = "0";
+            byte[] bytes = hc.putString(colValue).hash().asBytes();
+            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
         }
 
         // user the row key column hash to get a consolidated hash for each cuboid
         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
+            long value = 0;
             for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
             }
-
-            allCuboidsHLL[i].add(hc.hash().asBytes());
+            allCuboidsHLL[i].addHashDirectly(value);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
new file mode 100644
index 0000000..f018f28
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@Ignore
+public class NewCubeSamplingMethodTest {
+
+    private static final int ROW_LENGTH = 10;
+
+    private Integer[][] allCuboidsBitSet;
+
+    private long baseCuboidId;
+
+    private final int rowCount = 500000;
+
+    @Before
+    public void setup() {
+        baseCuboidId = (1L << ROW_LENGTH) - 1;
+        createAllCuboidBitSet();
+        System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+    }
+
+    @Ignore
+    @Test
+    public void testRandomData() throws Exception {
+        List<List<String>> dataSet = getRandomDataset(rowCount);
+        comparePerformanceBasic(dataSet);
+        compareAccuracyBasic(dataSet);
+    }
+
+
+    @Ignore
+    @Test
+    public void testSmallCardData() throws Exception {
+        List<List<String>> dataSet = getSmallCardDataset(rowCount);
+        comparePerformanceBasic(dataSet);
+        compareAccuracyBasic(dataSet);
+    }
+
+
+    public void comparePerformanceBasic(final List<List<String>> rows) throws Exception {
+        //old hash method
+        ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+        HLLCounter[] cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+        long start = System.currentTimeMillis();
+        for (List<String> row : rows) {
+            putRowKeyToHLL(row, colHashValues, cuboidCounters, Hashing.murmur3_32());
+        }
+        long totalTime = System.currentTimeMillis() - start;
+        System.out.println("old method cost time : " + totalTime);
+        //new hash method
+        colHashValues = getNewColHashValues(ROW_LENGTH);
+        cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+        start = System.currentTimeMillis();
+        long[] valueHashLong = new long[allCuboidsBitSet.length];
+        for (List<String> row : rows) {
+            putRowKeyToHLLNew(row, valueHashLong, cuboidCounters, Hashing.murmur3_128());
+        }
+        totalTime = System.currentTimeMillis() - start;
+        System.out.println("new method cost time : " + totalTime);
+    }
+
+    //test accuracy
+    public void compareAccuracyBasic(final List<List<String>> rows) throws Exception {
+        final long realCardinality = countCardinality(rows);
+        System.out.println("real cardinality : " + realCardinality);
+        //test1
+        long t1 = runAndGetTime(new TestCase() {
+            @Override
+            public void run() throws Exception {
+                HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+                final ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+                HashFunction hf = Hashing.murmur3_32();
+                for (List<String> row : rows) {
+
+                    int x = 0;
+                    for (String field : row) {
+                        Hasher hc = hf.newHasher();
+                        colHashValues[x++].set(hc.putString(field).hash().asBytes());
+                    }
+
+                    Hasher hc = hf.newHasher();
+                    for (int position = 0; position < colHashValues.length; position++) {
+                        hc.putBytes(colHashValues[position].array());
+                    }
+                    counter.add(hc.hash().asBytes());
+                }
+                long estimate = counter.getCountEstimate();
+                System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+            }
+        });
+
+
+        long t2 = runAndGetTime(new TestCase() {
+            @Override
+            public void run() throws Exception {
+                HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+                HashFunction hf2 = Hashing.murmur3_128();
+                long[] valueHashLong = new long[allCuboidsBitSet.length];
+                for (List<String> row : rows) {
+
+                    int x = 0;
+                    for (String field : row) {
+                        Hasher hc = hf2.newHasher();
+                        byte[] bytes = hc.putString(x + field).hash().asBytes();
+                        valueHashLong[x++] = Bytes.toLong(bytes);
+                    }
+
+                    long value = 0;
+                    for (int position = 0; position < row.size(); position++) {
+                        value += valueHashLong[position];
+                    }
+                    counter.addHashDirectly(value);
+                }
+                long estimate = counter.getCountEstimate();
+                System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+            }
+        });
+    }
+
+    public void createAllCuboidBitSet() {
+        List<Long> allCuboids = Lists.newArrayList();
+        List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+        for (long i = 1; i < baseCuboidId; i++) {
+            allCuboids.add(i);
+            addCuboidBitSet(i, allCuboidsBitSetList);
+        }
+        allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+    }
+
+    private ByteArray[] getNewColHashValues(int rowLength) {
+        ByteArray[] colHashValues = new ByteArray[rowLength];
+        for (int i = 0; i < rowLength; i++) {
+            colHashValues[i] = new ByteArray();
+        }
+        return colHashValues;
+    }
+
+    private HLLCounter[] getNewCuboidCounters(int cuboidNum) {
+        HLLCounter[] counters = new HLLCounter[cuboidNum];
+        for (int i = 0; i < counters.length; i++)
+            counters[i] = new HLLCounter(14, RegisterType.DENSE);
+        return counters;
+    }
+
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+
+    }
+
+    private long runAndGetTime(TestCase testCase) throws Exception {
+        long start = System.currentTimeMillis();
+        testCase.run();
+        long totalTime = System.currentTimeMillis() - start;
+        return totalTime;
+    }
+
+    interface TestCase {
+        void run() throws Exception;
+    }
+
+    private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hashFunction.newHasher();
+            colHashValues[x++].set(hc.putString(field).hash().asBytes());
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hashFunction.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(colHashValues[allCuboidsBitSet[i][position]].array());
+                //hc.putBytes(seperator);
+            }
+            cuboidCounters[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hashFunction.newHasher();
+            byte[] bytes = hc.putString(x + field).hash().asBytes();
+            hashValuesLong[x++] = Bytes.toLong(bytes);
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            long value = 0;
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                value += hashValuesLong[allCuboidsBitSet[i][position]];
+            }
+            cuboidCounters[i].addHashDirectly(value);
+        }
+    }
+
+    private List<List<String>> getRandomDataset(int size) {
+        List<List<String>> rows = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            rows.add(getRandomRow());
+        }
+        return rows;
+    }
+
+    private List<List<String>> getSmallCardDataset(int size) {
+        List<List<String>> rows = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            rows.add(getSmallCardRow());
+        }
+        return rows;
+    }
+
+    private List<String> getRandomRow() {
+        List<String> row = new ArrayList<>();
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row.add(RandomStringUtils.random(10));
+        }
+        return row;
+    }
+
+    private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
+
+    private Random rand = new Random(System.currentTimeMillis());
+
+    private List<String> getSmallCardRow() {
+        List<String> row = new ArrayList<>();
+        row.add(smallCardRow[rand.nextInt(smallCardRow.length)]);
+        for (int i = 1; i < ROW_LENGTH; i++) {
+            row.add("abc");
+        }
+        return row;
+    }
+
+
+    private int countCardinality(List<List<String>> rows) {
+        Set<String> diffCols = new HashSet<String>();
+        for (List<String> row : rows) {
+            StringBuilder sb = new StringBuilder();
+            for (String str : row) {
+                sb.append(str);
+            }
+            diffCols.add(sb.toString());
+        }
+        return diffCols.size();
+    }
+
+    private double countErrorRate(long estimate, long real) {
+        double rate = Math.abs((estimate - real) * 1.0) / real;
+        return rate;
+    }
+}


[05/21] kylin git commit: minor, process NoSuchMethodError with lower version Kafka consumer

Posted by li...@apache.org.
minor, process NoSuchMethodError with lower version Kafka consumer


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2ab9bf2d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2ab9bf2d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2ab9bf2d

Branch: refs/heads/KYLIN-2501
Commit: 2ab9bf2d88aae2e8fa6189e8e375e140e9dbb4b5
Parents: 8be842e
Author: Billy Liu <bi...@apache.org>
Authored: Wed Mar 15 21:03:47 2017 -0700
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Mar 15 21:03:47 2017 -0700

----------------------------------------------------------------------
 .../kylin/source/kafka/config/KafkaConsumerProperties.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2ab9bf2d/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 5b3dd87..cc32ed9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -79,8 +79,8 @@ public class KafkaConsumerProperties {
         Set<String> configNames = new HashSet<String>();
         try {
             configNames = ConsumerConfig.configNames();
-        } catch (Exception e) {
-            // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException
+        } catch (Error e) {
+            // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception
             String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
                     + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
             configNames.addAll(Arrays.asList(configNamesArray));


[14/21] kylin git commit: minor change on kylin pom

Posted by li...@apache.org.
minor change on kylin pom


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c82603a7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c82603a7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c82603a7

Branch: refs/heads/KYLIN-2501
Commit: c82603a7f4a0f8b1128cfe057a12b922cd1282f8
Parents: 3ae6f9c
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Mar 20 19:54:38 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Mar 20 19:54:38 2017 +0800

----------------------------------------------------------------------
 pom.xml | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c82603a7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a56dde..dec04bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,9 +62,8 @@
         <!-- Spark versions -->
         <spark.version>1.6.3</spark.version>
         <kryo.version>4.0.0</kryo.version>
-        <!--
-        <reflections.version>0.9.10</reflections.version>
-        -->
+        
+        <!-- <reflections.version>0.9.10</reflections.version> -->
 
         <!-- Calcite Version -->
         <calcite.version>1.11.0</calcite.version>


[11/21] kylin git commit: KYLIN-2514 reorder joinTable first

Posted by li...@apache.org.
KYLIN-2514 reorder joinTable first

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/08600484
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/08600484
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/08600484

Branch: refs/heads/KYLIN-2501
Commit: 0860048484639aa07f745726a58c8544a911e5f6
Parents: 19c87e7
Author: Roger Shi <ro...@hotmail.com>
Authored: Sat Mar 18 14:58:25 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sat Mar 18 17:29:20 2017 +0800

----------------------------------------------------------------------
 .../kylin/metadata/model/DataModelDesc.java     | 43 +++++++++++++++++++-
 .../apache/kylin/metadata/model/JoinsTree.java  | 42 ++-----------------
 2 files changed, 45 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/08600484/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 0a303ec..a26ccce 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -19,11 +19,13 @@
 package org.apache.kylin.metadata.model;
 
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -49,11 +51,11 @@ import com.google.common.collect.Sets;
 public class DataModelDesc extends RootPersistentEntity {
     private static final Logger logger = LoggerFactory.getLogger(DataModelDesc.class);
 
-    public static enum TableKind implements Serializable{
+    public static enum TableKind implements Serializable {
         FACT, LOOKUP
     }
 
-    public static enum RealizationCapacity implements Serializable{
+    public static enum RealizationCapacity implements Serializable {
         SMALL, MEDIUM, LARGE
     }
 
@@ -295,6 +297,7 @@ public class DataModelDesc extends RootPersistentEntity {
         initJoinTablesForUpgrade();
         initTableAlias(tables);
         initJoinColumns();
+        reorderJoins(tables);
         initJoinsTree();
         initDimensionsAndMetrics();
         initPartitionDesc();
@@ -452,6 +455,42 @@ public class DataModelDesc extends RootPersistentEntity {
         joinsTree = new JoinsTree(rootFactTableRef, joins);
     }
 
+    private void reorderJoins(Map<String, TableDesc> tables) {
+        if (joinTables.length == 0) {
+            return;
+        }
+
+        Map<String, List<JoinTableDesc>> fkMap = Maps.newHashMap();
+        for (JoinTableDesc joinTable : joinTables) {
+            JoinDesc join = joinTable.getJoin();
+            String fkSideName = join.getFKSide().getAlias();
+            if (fkMap.containsKey(fkSideName)) {
+                fkMap.get(fkSideName).add(joinTable);
+            } else {
+                List<JoinTableDesc> joinTableList = Lists.newArrayList();
+                joinTableList.add(joinTable);
+                fkMap.put(fkSideName, joinTableList);
+            }
+        }
+
+        JoinTableDesc[] orderedJoinTables = new JoinTableDesc[joinTables.length];
+        int orderedIndex = 0;
+
+        Queue<JoinTableDesc> joinTableBuff = new ArrayDeque<JoinTableDesc>();
+        TableDesc rootDesc = tables.get(rootFactTable);
+        joinTableBuff.addAll(fkMap.get(rootDesc.getName()));
+        while (!joinTableBuff.isEmpty()) {
+            JoinTableDesc head = joinTableBuff.poll();
+            orderedJoinTables[orderedIndex++] = head;
+            String headAlias = head.getJoin().getPKSide().getAlias();
+            if (fkMap.containsKey(headAlias)) {
+                joinTableBuff.addAll(fkMap.get(headAlias));
+            }
+        }
+
+        joinTables = orderedJoinTables;
+    }
+
     private boolean validate() {
 
         // ensure no dup between dimensions/metrics

http://git-wip-us.apache.org/repos/asf/kylin/blob/08600484/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index c6df52e..8e2192f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -19,17 +19,13 @@
 package org.apache.kylin.metadata.model;
 
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 public class JoinsTree implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -44,41 +40,11 @@ public class JoinsTree implements Serializable {
                 Preconditions.checkState(col.isQualified());
         }
 
-        // Walk through joins to build FK table to joins mapping
-        HashMap<String, List<JoinDesc>> fkJoinMap = Maps.newHashMap();
-        int joinCount = 0;
+        tableChains.put(rootTable.getAlias(), new Chain(rootTable, null, null));
         for (JoinDesc join : joins) {
-            joinCount++;
-            String fkSideAlias = join.getFKSide().getAlias();
-            if (fkJoinMap.containsKey(fkSideAlias)) {
-                fkJoinMap.get(fkSideAlias).add(join);
-            } else {
-                List<JoinDesc> joinDescList = Lists.newArrayList(join);
-                fkJoinMap.put(fkSideAlias, joinDescList);
-            }
-        }
-
-        // Width-first build tree (tableChains)
-        Queue<Chain> chainBuff = new ArrayDeque<Chain>();
-        chainBuff.add(new Chain(rootTable, null, null));
-        int chainCount = 0;
-        while (!chainBuff.isEmpty()) {
-            Chain chain = chainBuff.poll();
-            String pkSideAlias = chain.table.getAlias();
-            chainCount++;
-            tableChains.put(pkSideAlias, chain);
-
-            // this round pk side is next round's fk side
-            if (fkJoinMap.containsKey(pkSideAlias)) {
-                for (JoinDesc join : fkJoinMap.get(pkSideAlias)) {
-                    chainBuff.add(new Chain(join.getPKSide(), join, chain));
-                }
-            }
-        }
-
-        // if join count not match (chain count - 1), there must be some join not take effect
-        if (joinCount != (chainCount - 1)) {
-            throw new IllegalArgumentException("There's some illegal Joins, please check your model");
+            TableRef pkSide = join.getPKSide();
+            Chain fkSide = tableChains.get(join.getFKSide().getAlias());
+            tableChains.put(pkSide.getAlias(), new Chain(pkSide, join, fkSide));
         }
     }
 


[09/21] kylin git commit: KYLIN-2514 make left and inner model align

Posted by li...@apache.org.
KYLIN-2514 make left and inner model align

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8b70fa52
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8b70fa52
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8b70fa52

Branch: refs/heads/KYLIN-2501
Commit: 8b70fa5210c7c2f95086dd2311d373277161d4ed
Parents: 14b96a8
Author: Roger Shi <ro...@hotmail.com>
Authored: Fri Mar 17 22:19:16 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 17 23:28:15 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/JoinsTree.java  |  3 ++-
 .../model_desc/ci_inner_join_model.json         | 28 ++++++++++----------
 2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8b70fa52/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index 3c876a0..224788c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.metadata.model;
 
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -59,7 +60,7 @@ public class JoinsTree  implements Serializable {
         }
 
         // Width-first build tree (tableChains)
-        Queue<Chain> chainBuff = Queues.newArrayDeque();
+        Queue<Chain> chainBuff = new ArrayDeque<Chain>();
         chainBuff.add(new Chain(rootTable, null, null));
         int chainCount = 0;
         while (!chainBuff.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/8b70fa52/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json b/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
index 19cf721..b79d293 100644
--- a/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
+++ b/examples/test_case_data/localmeta/model_desc/ci_inner_join_model.json
@@ -31,20 +31,6 @@
       }
     },
     {
-      "table": "DEFAULT.TEST_ACCOUNT",
-      "alias": "SELLER_ACCOUNT",
-      "kind": "FACT",
-      "join": {
-        "type": "INNER",
-        "primary_key": [
-          "SELLER_ACCOUNT.ACCOUNT_ID"
-        ],
-        "foreign_key": [
-          "TEST_KYLIN_FACT.SELLER_ID"
-        ]
-      }
-    },
-    {
       "table": "EDW.TEST_CAL_DT",
       "join": {
         "type": "INNER",
@@ -119,6 +105,20 @@
           "SELLER_ACCOUNT.ACCOUNT_COUNTRY"
         ]
       }
+    },
+    {
+      "table": "DEFAULT.TEST_ACCOUNT",
+      "alias": "SELLER_ACCOUNT",
+      "kind": "FACT",
+      "join": {
+        "type": "INNER",
+        "primary_key": [
+          "SELLER_ACCOUNT.ACCOUNT_ID"
+        ],
+        "foreign_key": [
+          "TEST_KYLIN_FACT.SELLER_ID"
+        ]
+      }
     }
   ],
   "dimensions": [


[06/21] kylin git commit: minor, job diag support different hadoop env

Posted by li...@apache.org.
minor, job diag support different hadoop env


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c800d6e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c800d6e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c800d6e

Branch: refs/heads/KYLIN-2501
Commit: 6c800d6e825c36ebe677b19efd7f02a7e3b9509b
Parents: 2ab9bf2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 16:48:02 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 17:22:31 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/tool/MrJobInfoExtractor.java   | 51 ++++++++++++++------
 1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6c800d6e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index b9bf2de..ca4c7e1 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class MrJobInfoExtractor extends AbstractInfoExtractor {
@@ -59,6 +61,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
 
     private static final int HTTP_RETRY = 3;
 
+    private Map<String, String> nodeInfoMap = Maps.newHashMap();
+
+    private String jobHistoryUrlBase;
+    private String yarnMasterUrlBase;
+
     public MrJobInfoExtractor() {
         packageType = "MR";
 
@@ -71,14 +78,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         extractor.execute(args);
     }
 
-    private String getRestCheckUrl() {
+    private void extractRestCheckUrl() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
-        Pattern pattern = Pattern.compile("(http://)(.*):.*");
+        Pattern pattern = Pattern.compile("(http://)([^:]*):([^/])*.*");
         if (yarnStatusCheckUrl != null) {
             Matcher m = pattern.matcher(yarnStatusCheckUrl);
             if (m.matches()) {
-                return m.group(1) + m.group(2) + ":19888";
+                jobHistoryUrlBase = m.group(1) + m.group(2) + ":19888";
+                yarnMasterUrlBase = m.group(1) + m.group(2) + ":" + m.group(3);
             }
         }
         logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration");
@@ -91,14 +99,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf);
         }
         if (StringUtils.isEmpty(rmWebHost)) {
-            return null;
+            return;
         }
         if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
             rmWebHost = "http://" + rmWebHost;
         }
         Matcher m = pattern.matcher(rmWebHost);
         Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
-        return m.group(1) + m.group(2) + ":19888";
+        yarnMasterUrlBase = rmWebHost;
+        jobHistoryUrlBase = m.group(1) + HAUtil.getConfValueForRMInstance("mapreduce.jobhistory.webapp.address", m.group(2) + ":19888", conf);
     }
 
     private String getHttpResponse(String url) {
@@ -122,7 +131,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         return msg;
     }
 
-    private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
+    private void extractTaskDetail(String taskId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
         try {
             if (StringUtils.isEmpty(taskId)) {
                 return;
@@ -137,8 +146,9 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             String succAttemptId = taskAttempt.textValue();
 
             String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId);
-            JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId");
-            String containerId = attemptAttempt.textValue();
+            JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt");
+            String containerId = attemptAttempt.get("assignedContainerId").textValue();
+            String nodeId = nodeInfoMap.get(attemptAttempt.get("nodeHttpAddress").textValue());
 
             // save task counters
             saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters");
@@ -173,16 +183,25 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         try {
             boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true;
             String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
-            String jobUrlBase = getRestCheckUrl();
-            String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+            extractRestCheckUrl();
+
+            Preconditions.checkNotNull(jobHistoryUrlBase);
+            Preconditions.checkNotNull(yarnMasterUrlBase);
+
+            String jobUrlPrefix = jobHistoryUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
+            // cache node info
+            String nodeUrl = yarnMasterUrlBase + "/ws/v1/cluster/nodes";
+            String nodeResponse = getHttpResponse(nodeUrl);
+            JsonNode nodes = new ObjectMapper().readTree(nodeResponse).path("nodes").path("node");
+            for (JsonNode node : nodes) {
+                nodeInfoMap.put(node.path("nodeHTTPAddress").textValue(), node.path("id").textValue());
+            }
 
             // save mr job stats
             String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix);
             String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue();
 
-            String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts");
-            String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue();
-
             // save mr job conf
             saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf");
 
@@ -191,7 +210,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
 
             // save task details
             if (includeTaskDetails) {
-                extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user);
+                extractTaskDetails(exportDir, jobUrlPrefix, jobHistoryUrlBase, user);
             }
 
         } catch (Exception e) {
@@ -199,7 +218,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         }
     }
 
-    private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) {
+    private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String user) {
         try {
             String tasksUrl = jobUrlPrefix + "/tasks/";
             String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl);
@@ -324,7 +343,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             File tasksDir = new File(exportDir, "tasks");
             FileUtils.forceMkdir(tasksDir);
             for (String taskId : selectedTaskIds) {
-                extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase);
+                extractTaskDetail(taskId, user, tasksDir, tasksUrl, jobUrlBase);
             }
         } catch (Exception e) {
             logger.warn("Failed to get mr tasks rest response.", e);


[03/21] kylin git commit: KYLIN-2504 keep the engine_type property when clone cube

Posted by li...@apache.org.
KYLIN-2504 keep the engine_type property when clone cube


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dde297e4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dde297e4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dde297e4

Branch: refs/heads/KYLIN-2501
Commit: dde297e48a6886f662cd153c6f834f7e26560522
Parents: 32034fc
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 22:25:51 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 22:25:51 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/rest/controller/CubeController.java    | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/dde297e4/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index da44aec..8cdfcfb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -361,10 +360,7 @@ public class CubeController extends BasicController {
         CubeDesc cubeDesc = cube.getDescriptor();
         CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
 
-        KylinConfig config = cubeService.getConfig();
         newCubeDesc.setName(newCubeName);
-        newCubeDesc.setEngineType(config.getDefaultCubeEngine());
-        newCubeDesc.setStorageType(config.getDefaultStorageEngine());
 
         CubeInstance newCube;
         try {


[13/21] kylin git commit: KYLIN-2434 support config kylin.source.hive.database-for-flat-table in Spark cubing

Posted by li...@apache.org.
KYLIN-2434 support config kylin.source.hive.database-for-flat-table in Spark cubing


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ae6f9c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ae6f9c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ae6f9c9

Branch: refs/heads/KYLIN-2501
Commit: 3ae6f9c9120360cdc9fc238a2a9208fa9813aea6
Parents: 98664f0
Author: shaofengshi <sh...@apache.org>
Authored: Sat Mar 18 10:39:57 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 19 09:47:47 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/spark/SparkBatchCubingJobBuilder2.java     | 2 +-
 .../org/apache/kylin/engine/spark/SparkCubingByLayer.java   | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e0b3e6c..66b154d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,7 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f7ed2d0..f70fd30 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -77,22 +77,23 @@ import java.util.List;
 
 
 /**
+ * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
  */
 public class SparkCubingByLayer extends AbstractApplication implements Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class);
 
-    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
     public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
 
     private Options options;
 
     public SparkCubingByLayer() {
         options = new Options();
-        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_CONF_PATH);
@@ -134,7 +135,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
-        final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
@@ -154,7 +155,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
 
         HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame intermediateTable = sqlContext.table(envConfig.getHiveDatabaseForIntermediateTable() + "." + hiveTable);
+        final DataFrame intermediateTable = sqlContext.table(hiveTable);
 
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();