You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/18 09:53:16 UTC

[kylin] 06/08: KYLIN-3362 support dynamic dimension push down

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3359
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e0f34c9d0facb10b415290edb22737c495fa8c73
Author: Zhong <nj...@apache.org>
AuthorDate: Sun May 13 15:45:07 2018 +0800

    KYLIN-3362 support dynamic dimension push down
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../kylin/cube/gridtable/CubeCodeSystem.java       | 13 +++--
 .../apache/kylin/cube/gridtable/CubeGridTable.java |  4 +-
 .../gridtable/CuboidToGridTableMappingExt.java     | 21 +++++++-
 .../java/org/apache/kylin/gridtable/GTInfo.java    | 17 ++++++
 .../metadata/datatype/DynamicDimSerializer.java    | 63 ++++++++++++++++++++++
 .../kylin/metadata/realization/SQLDigest.java      |  9 +++-
 .../storage/gtrecord/CubeScanRangePlanner.java     | 12 ++++-
 .../kylin/storage/gtrecord/CubeSegmentScanner.java | 17 +++---
 .../storage/gtrecord/GTCubeStorageQueryBase.java   | 33 ++++++++----
 .../gtrecord/GTCubeStorageQueryRequest.java        | 16 +++++-
 .../gtrecord/SequentialCubeTupleIterator.java      | 11 ++--
 .../apache/kylin/storage/hbase/ITStorageTest.java  |  2 +
 .../kylin/query/relnode/OLAPAggregateRel.java      | 37 ++++++++++---
 .../apache/kylin/query/relnode/OLAPContext.java    | 11 +++-
 14 files changed, 227 insertions(+), 39 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 9eae6f3..3577476 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -34,6 +34,7 @@ import org.apache.kylin.gridtable.IGTCodeSystem;
 import org.apache.kylin.gridtable.IGTComparator;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.datatype.DynamicDimSerializer;
 
 /**
  * defines how column values will be encoded to/ decoded from GTRecord 
@@ -68,7 +69,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
     @Override
     public void init(GTInfo info) {
         this.info = info;
-
+        ImmutableBitSet dDims = info.getDynamicDims();
         this.serializers = new DataTypeSerializer[info.getColumnCount()];
         for (int i = 0; i < serializers.length; i++) {
             DimensionEncoding dimEnc = i < dimEncs.length ? dimEncs[i] : null;
@@ -77,8 +78,14 @@ public class CubeCodeSystem implements IGTCodeSystem {
                 // for dimensions
                 serializers[i] = dimEnc.asDataTypeSerializer();
             } else {
-                // for measures
-                serializers[i] = DataTypeSerializer.create(info.getColumnType(i));
+                DataTypeSerializer dSerializer = DataTypeSerializer.create(info.getColumnType(i));
+                if (dDims != null && dDims.get(i)) {
+                    // for dynamic dimensions
+                    dSerializer = new DynamicDimSerializer(dSerializer);
+                } else {
+                    // for measures
+                }
+                serializers[i] = dSerializer;
             }
         }
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 79732e8..2a819bc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -44,7 +44,9 @@ public class CubeGridTable {
         builder.setColumns(mapping.getDataTypes());
         builder.setPrimaryKey(mapping.getPrimaryKey());
         builder.enableColumnBlock(mapping.getColumnBlocks());
-
+        if (mapping instanceof CuboidToGridTableMappingExt) {
+            builder.enableDynamicDims(((CuboidToGridTableMappingExt) mapping).getDynamicDims());
+        }
         return builder.build();
     }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
index fbdd07e..32c4ca0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
@@ -35,8 +35,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
+    private final List<TblColRef> dynDims;
     private final List<DynamicFunctionDesc> dynFuncs;
 
+    private ImmutableBitSet dynamicDims;
+
     private List<DataType> dynGtDataTypes;
     private List<ImmutableBitSet> dynGtColBlocks;
 
@@ -44,8 +47,9 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
 
     private Map<FunctionDesc, Integer> dynMetrics2gt;
 
-    public CuboidToGridTableMappingExt(Cuboid cuboid, List<DynamicFunctionDesc> dynFuncs) {
+    public CuboidToGridTableMappingExt(Cuboid cuboid, List<TblColRef> dynDims, List<DynamicFunctionDesc> dynFuncs) {
         super(cuboid);
+        this.dynDims = dynDims;
         this.dynFuncs = dynFuncs;
         init();
     }
@@ -59,6 +63,15 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
         int gtColIdx = super.getColumnCount();
 
         BitSet rtColBlock = new BitSet();
+        // dynamic dimensions
+        for (TblColRef rtDim : dynDims) {
+            dynDim2gt.put(rtDim, gtColIdx);
+            dynGtDataTypes.add(rtDim.getType());
+            rtColBlock.set(gtColIdx);
+            gtColIdx++;
+        }
+        dynamicDims = new ImmutableBitSet(rtColBlock);
+
         // dynamic metrics
         for (DynamicFunctionDesc rtFunc : dynFuncs) {
             dynMetrics2gt.put(rtFunc, gtColIdx);
@@ -70,9 +83,13 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
         dynGtColBlocks.add(new ImmutableBitSet(rtColBlock));
     }
 
+    public ImmutableBitSet getDynamicDims() {
+        return dynamicDims;
+    }
+
     @Override
     public int getColumnCount() {
-        return super.getColumnCount() + dynMetrics2gt.size();
+        return super.getColumnCount() + dynDims.size() + dynFuncs.size();
     }
 
     @Override
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d10d6e7..739adf8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -57,6 +57,9 @@ public class GTInfo {
     int rowBlockSize; // 0: disable row block
     ImmutableBitSet colBlocksAll;
 
+    // not included during serialization, only used for loadColumns
+    ImmutableBitSet dynamicDims;
+
     // must create from builder
     private GTInfo() {
     }
@@ -93,6 +96,10 @@ public class GTInfo {
         return colAll;
     }
 
+    public ImmutableBitSet getDynamicDims() {
+        return dynamicDims;
+    }
+
     public boolean isRowBlockEnabled() {
         return rowBlockSize > 0;
     }
@@ -214,6 +221,10 @@ public class GTInfo {
                 it.remove();
         }
         colBlocks = list.toArray(new ImmutableBitSet[list.size()]);
+
+        // for dynamic dimensions
+        if (dynamicDims == null)
+            dynamicDims = ImmutableBitSet.EMPTY;
     }
 
     public static class Builder {
@@ -269,6 +280,12 @@ public class GTInfo {
             return this;
         }
 
+        /** optional */
+        public Builder enableDynamicDims(ImmutableBitSet dynamicDims) {
+            info.dynamicDims = dynamicDims;
+            return this;
+        }
+
         public GTInfo build() {
             info.validate();
             return info;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java
new file mode 100644
index 0000000..a1c42a8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+/**
+ * For dynamic dimensions, the code length must be fixed
+ */
+public class DynamicDimSerializer<T> extends DataTypeSerializer<T> {
+
+    private final DataTypeSerializer<T> dimDataTypeSerializer;
+
+    public DynamicDimSerializer(DataTypeSerializer<T> dimDataTypeSerializer) {
+        this.dimDataTypeSerializer = dimDataTypeSerializer;
+    }
+
+    public void serialize(T value, ByteBuffer out) {
+        dimDataTypeSerializer.serialize(value, out);
+    }
+
+    public T deserialize(ByteBuffer in) {
+        return dimDataTypeSerializer.deserialize(in);
+    }
+
+    public int peekLength(ByteBuffer in) {
+        return maxLength();
+    }
+
+    public int maxLength() {
+        return dimDataTypeSerializer.maxLength();
+    }
+
+    public int getStorageBytesEstimate() {
+        return dimDataTypeSerializer.getStorageBytesEstimate();
+    }
+
+    /** An optional convenient method that converts a string to this data type (for dimensions) */
+    public T valueOf(String str) {
+        return dimDataTypeSerializer.valueOf(str);
+    }
+
+    /** Convert from obj to string */
+    public String toString(T value) {
+        return dimDataTypeSerializer.toString(value);
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 45ba95a..0b23e48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.metadata.realization;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.DynamicFunctionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -57,6 +59,8 @@ public class SQLDigest {
     public List<TblColRef> groupbyColumns;
     public Set<TblColRef> subqueryJoinParticipants;
 
+    public Map<TblColRef, TupleExpression> dynGroupbyColumns;
+
     // aggregation
     public Set<TblColRef> metricColumns;
     public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply
@@ -80,7 +84,8 @@ public class SQLDigest {
     public Set<MeasureDesc> involvedMeasure;
 
     public SQLDigest(String factTable, Set<TblColRef> allColumns, List<JoinDesc> joinDescs, // model
-            List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, // group by
+            List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants,
+            Map<TblColRef, TupleExpression> dynGroupByColumns, // group by
             Set<TblColRef> metricColumns, List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, // aggregation
             List<DynamicFunctionDesc> dynAggregations, //
             Set<TblColRef> rtDimensionColumns, Set<TblColRef> rtMetricColumns, // dynamic col related columns
@@ -95,6 +100,8 @@ public class SQLDigest {
         this.groupbyColumns = groupbyColumns;
         this.subqueryJoinParticipants = subqueryJoinParticipants;
 
+        this.dynGroupbyColumns = dynGroupByColumns;
+
         this.metricColumns = metricColumns;
         this.aggregations = aggregations;
         this.aggrSqlCalls = aggrSqlCalls;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index d0f2ca2..f99c868 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.DynamicFunctionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -75,7 +76,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     protected Cuboid cuboid;
 
     public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, //
-            Set<TblColRef> groupByDims, //
+            Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, //
             Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
             TupleFilter havingFilter, StorageContext context) {
         this.context = context;
@@ -102,6 +103,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
 
         //replace the constant values in filter to dictionary codes
         Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
+        groupByPushDown.addAll(dynGroupsDims);
         this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown);
         this.havingFilter = havingFilter;
 
@@ -112,10 +114,16 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
 
         // for dynamic cols, which are as appended columns to GTInfo
         BitSet tmpGtDynCols = new BitSet();
+        tmpGtDynCols.or(mapping.makeGridTableColumns(Sets.newHashSet(dynGroupsDims)).mutable());
         tmpGtDynCols.or(mapping.makeGridTableColumns(dynFuncs).mutable());
         this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
 
-        this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynFuncs.size());
+        this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynGroupExprs.size() + dynFuncs.size());
+        // for dynamic dimensions
+        for (TupleExpression rtGroupExpr : dynGroupExprs) {
+            this.tupleExpressionList
+                    .add(GTUtil.convertFilterColumnsAndConstants(rtGroupExpr, gtInfo, mapping, groupByPushDown));
+        }
 
         // for dynamic measures
         Set<FunctionDesc> tmpRtAggrMetrics = Sets.newHashSet();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index d8b245c..95ffa35 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.StringCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -53,12 +54,12 @@ public class CubeSegmentScanner implements IGTScanner {
     final GTScanRequest scanRequest;
 
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, //
-            Set<TblColRef> groups, //
+            Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, //
             Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
             TupleFilter originalfilter, TupleFilter havingFilter, StorageContext context) {
-        
+
         logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName());
-        
+
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
 
@@ -74,20 +75,20 @@ public class CubeSegmentScanner implements IGTScanner {
 
         CubeScanRangePlanner scanRangePlanner;
         try {
-            scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, dynFuncs,
-                    havingFilter, context);
+            scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
+                    dynGroupExprs, metrics, dynFuncs, havingFilter, context);
         } catch (RuntimeException e) {
             throw e;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        
+
         scanRequest = scanRangePlanner.planScanRequest();
-        
+
         String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
     }
-    
+
     public boolean isSegmentSkipped() {
         return scanner.isSegmentSkipped();
     }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 728bb46..b9b10c2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -42,6 +42,7 @@ import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -93,7 +94,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                 continue;
             }
 
-            scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), //
+            scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
+                    request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                     request.getMetrics(), request.getDynFuncs(), //
                     request.getFilter(), request.getHavingFilter(), request.getContext());
             if (!scanner.isSegmentSkipped())
@@ -104,7 +106,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
 
         return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(),
-                request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest);
+                request.getDynGroups(), request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest);
     }
 
     public GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext context, SQLDigest sqlDigest,
@@ -144,13 +146,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
         // set cuboid to GridTable mapping;
         boolean noDynamicCols;
-
+        // dynamic dimensions
+        List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet());
+        noDynamicCols = dynGroups.isEmpty();
+        List<TupleExpression> dynGroupExprs = Lists.newArrayListWithExpectedSize(sqlDigest.dynGroupbyColumns.size());
+        for (TblColRef dynGroupCol : dynGroups) {
+            dynGroupExprs.add(sqlDigest.dynGroupbyColumns.get(dynGroupCol));
+        }
         // dynamic measures
         List<DynamicFunctionDesc> dynFuncs = sqlDigest.dynAggregations;
-        noDynamicCols = dynFuncs.isEmpty();
+        noDynamicCols = noDynamicCols && dynFuncs.isEmpty();
 
         CuboidToGridTableMapping mapping = noDynamicCols ? new CuboidToGridTableMapping(cuboid)
-                : new CuboidToGridTableMappingExt(cuboid, dynFuncs);
+                : new CuboidToGridTableMappingExt(cuboid, dynGroups, dynFuncs);
         context.setMapping(mapping);
 
         // set whether to aggr at storage
@@ -171,8 +179,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         context.setFilterMask(getQueryFilterMask(filterColumnD));
 
         // set limit push down
-        enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filterD, loosenedColumnD,
-                sqlDigest.aggregations, context);
+        enableStorageLimitIfPossible(cuboid, groups, dynGroups, derivedPostAggregation, groupsD, filterD,
+                loosenedColumnD, sqlDigest.aggregations, context);
         // set whether to aggregate results from multiple partitions
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // check query deadline
@@ -187,8 +195,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                 cubeInstance.getName(), cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(),
                 context.getStorageLimitLevel(), context.isNeedStorageAggregation());
 
-        return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, dynFuncs, filterD,
-                havingFilter, context);
+        return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, dynGroups, dynGroupExprs, filterColumnD,
+                metrics, dynFuncs, filterD, havingFilter, context);
     }
 
     protected abstract String getGTStorage();
@@ -408,7 +416,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups,
+    private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, List<TblColRef> dynGroups,
             Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter,
             Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) {
 
@@ -424,6 +432,11 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                             + " with cuboid columns: " + cuboid.getColumns());
         }
 
+        if (!dynGroups.isEmpty()) {
+            storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+            logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups);
+        }
+
         // derived aggregation is bad, unless expanded columns are already in group by
         if (!groups.containsAll(derivedPostAggregation)) {
             storageLimitLevel = StorageLimitLevel.NO_LIMIT;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
index c66e813..fdc976e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.DynamicFunctionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -36,17 +37,22 @@ public class GTCubeStorageQueryRequest implements Serializable {
     private Set<TblColRef> groups;
     private Set<TblColRef> filterCols;
     private Set<FunctionDesc> metrics;
+    private List<TblColRef> dynGroups;
+    private List<TupleExpression> dynGroupExprs;
     private List<DynamicFunctionDesc> dynFuncs;
     private TupleFilter filter;
     private TupleFilter havingFilter;
     private StorageContext context;
 
-    public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+    public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, //
+            Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, //
             Set<TblColRef> filterCols, Set<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
             TupleFilter filter, TupleFilter havingFilter, StorageContext context) {
         this.cuboid = cuboid;
         this.dimensions = dimensions;
         this.groups = groups;
+        this.dynGroups = dynGroups;
+        this.dynGroupExprs = dynGroupExprs;
         this.filterCols = filterCols;
         this.metrics = metrics;
         this.dynFuncs = dynFuncs;
@@ -79,6 +85,14 @@ public class GTCubeStorageQueryRequest implements Serializable {
         this.groups = groups;
     }
 
+    public List<TblColRef> getDynGroups() {
+        return dynGroups;
+    }
+
+    public List<TupleExpression> getDynGroupExprs() {
+        return dynGroupExprs;
+    }
+
     public Set<FunctionDesc> getMetrics() {
         return metrics;
     }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index c067e33..b8dff8b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+import com.google.common.collect.Sets;
 import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -53,14 +54,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     private int scanCount;
     private int scanCountDelta;
 
-    public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
-            Set<TblColRef> groups, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) {
+    public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid,
+            Set<TblColRef> selectedDimensions, List<TblColRef> rtGroups, Set<TblColRef> groups, //
+            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) {
         this.context = context;
         this.scanners = scanners;
 
+        Set<TblColRef> selectedDims = Sets.newHashSet(selectedDimensions);
+        selectedDims.addAll(rtGroups);
+
         segmentCubeTupleIterators = Lists.newArrayList();
         for (CubeSegmentScanner scanner : scanners) {
-            segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+            segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDims, selectedMetrics, returnTupleInfo, context));
         }
 
         if (context.mergeSortPartitionResults() && !sqlDigest.isRawQuery) {
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index c432c12..61aa560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.DynamicFunctionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -140,6 +141,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         try {
             SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", /*allCol*/ Collections.<TblColRef> emptySet(), /*join*/ null, //
                     groups, /*subqueryJoinParticipants*/ Sets.<TblColRef> newHashSet(), //
+                    /*dynamicGroupByColumns*/ Collections.<TblColRef, TupleExpression> emptyMap(), //
                     /*metricCol*/ Collections.<TblColRef> emptySet(), aggregations, /*aggrSqlCalls*/ Collections.<SQLCall> emptyList(), //
                     /*dynamicAggregations*/ Collections.<DynamicFunctionDesc> emptyList(), //
                     /*runtimeDimensionColumns*/ Collections.<TblColRef> emptySet(), //
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 0eff905..84f7676 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -266,12 +266,37 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         this.groups = Lists.newArrayList();
         for (int i = getGroupSet().nextSetBit(0); i >= 0; i = getGroupSet().nextSetBit(i + 1)) {
             TupleExpression tupleExpression = inputColumnRowType.getSourceColumnsByIndex(i);
-            Set<TblColRef> srcCols = ExpressionColCollector.collectColumns(tupleExpression);
-            // if no source columns, use target column instead
-            if (srcCols.isEmpty()) {
-                srcCols.add(inputColumnRowType.getColumnByIndex(i));
+            if (tupleExpression instanceof ColumnTupleExpression) {
+                this.groups.add(((ColumnTupleExpression) tupleExpression).getColumn());
+            } else {
+                TblColRef groupOutCol = inputColumnRowType.getColumnByIndex(i);
+                Pair<Set<TblColRef>, Set<TblColRef>> cols = ExpressionColCollector.collectColumnsPair(tupleExpression);
+
+                // push down only available for the innermost aggregation
+                boolean ifPushDown = !afterAggregate;
+
+                // if measure columns exist, don't do push down
+                if (!cols.getSecond().isEmpty()) {
+                    ifPushDown = false;
+                }
+
+                // if existing a dimension which is a derived column, don't do push down
+                for (TblColRef dimCol : cols.getFirst()) {
+                    if (!this.context.belongToFactTableDims(dimCol)) {
+                        ifPushDown = false;
+                        break;
+                    }
+                }
+
+                if (ifPushDown) {
+                    this.groups.add(groupOutCol);
+                    this.context.dynGroupBy.put(groupOutCol, tupleExpression);
+                } else {
+                    this.groups.addAll(cols.getFirst());
+                    this.groups.addAll(cols.getSecond());
+                    this.context.dynamicFields.remove(groupOutCol);
+                }
             }
-            this.groups.addAll(srcCols);
         }
     }
 
@@ -321,7 +346,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
                 } else if (aggCall.getAggregation() instanceof SqlCountAggFunction && !aggCall.isDistinct()) {
                     if (tupleExpr instanceof ColumnTupleExpression) {
                         TblColRef srcCol = ((ColumnTupleExpression) tupleExpr).getColumn();
-                        if (this.context.belongToFactTable(srcCol)) {
+                        if (this.context.belongToFactTableDims(srcCol)) {
                             tupleExpr = getCountColumnExpression(srcCol);
 
                             TblColRef column = TblColRef.newInnerColumn(tupleExpr.getDigest(),
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 20533ad..f3dcd1b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -33,6 +33,8 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.expression.ExpressionColCollector;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -158,6 +160,8 @@ public class OLAPContext {
     // dynamic columns info, note that the name of TblColRef will be the field name
     public Map<TblColRef, RelDataType> dynamicFields = new HashMap<>();
 
+    public Map<TblColRef, TupleExpression> dynGroupBy = new HashMap<>();
+
     // hive query
     public String sql = "";
 
@@ -172,6 +176,9 @@ public class OLAPContext {
     public SQLDigest getSQLDigest() {
         if (sqlDigest == null) {
             Set<TblColRef> rtDimColumns = new HashSet<>();
+            for (TupleExpression tupleExpr : dynGroupBy.values()) {
+                rtDimColumns.addAll(ExpressionColCollector.collectColumns(tupleExpr));
+            }
             Set<TblColRef> rtMetricColumns = new HashSet<>();
             List<DynamicFunctionDesc> dynFuncs = Lists.newLinkedList();
             for (FunctionDesc functionDesc : aggregations) {
@@ -183,7 +190,7 @@ public class OLAPContext {
                 }
             }
             sqlDigest = new SQLDigest(firstTableScan.getTableName(), allColumns, joins, // model
-                    groupByColumns, subqueryJoinParticipants, // group by
+                    groupByColumns, subqueryJoinParticipants, dynGroupBy, // group by
                     metricsColumns, aggregations, aggrSqlCalls, dynFuncs, // aggregation
                     rtDimColumns, rtMetricColumns, // runtime related columns
                     filterColumns, filter, havingFilter, // filter
@@ -211,7 +218,7 @@ public class OLAPContext {
         return false;
     }
 
-    public boolean belongToFactTable(TblColRef tblColRef) {
+    public boolean belongToFactTableDims(TblColRef tblColRef) {
         if (!belongToContextTables(tblColRef)) {
             return false;
         }

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.