You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/18 09:53:16 UTC
[kylin] 06/08: KYLIN-3362 support dynamic dimension push down
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch KYLIN-3359
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e0f34c9d0facb10b415290edb22737c495fa8c73
Author: Zhong <nj...@apache.org>
AuthorDate: Sun May 13 15:45:07 2018 +0800
KYLIN-3362 support dynamic dimension push down
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../kylin/cube/gridtable/CubeCodeSystem.java | 13 +++--
.../apache/kylin/cube/gridtable/CubeGridTable.java | 4 +-
.../gridtable/CuboidToGridTableMappingExt.java | 21 +++++++-
.../java/org/apache/kylin/gridtable/GTInfo.java | 17 ++++++
.../metadata/datatype/DynamicDimSerializer.java | 63 ++++++++++++++++++++++
.../kylin/metadata/realization/SQLDigest.java | 9 +++-
.../storage/gtrecord/CubeScanRangePlanner.java | 12 ++++-
.../kylin/storage/gtrecord/CubeSegmentScanner.java | 17 +++---
.../storage/gtrecord/GTCubeStorageQueryBase.java | 33 ++++++++----
.../gtrecord/GTCubeStorageQueryRequest.java | 16 +++++-
.../gtrecord/SequentialCubeTupleIterator.java | 11 ++--
.../apache/kylin/storage/hbase/ITStorageTest.java | 2 +
.../kylin/query/relnode/OLAPAggregateRel.java | 37 ++++++++++---
.../apache/kylin/query/relnode/OLAPContext.java | 11 +++-
14 files changed, 227 insertions(+), 39 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 9eae6f3..3577476 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -34,6 +34,7 @@ import org.apache.kylin.gridtable.IGTCodeSystem;
import org.apache.kylin.gridtable.IGTComparator;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.datatype.DynamicDimSerializer;
/**
* defines how column values will be encoded to/ decoded from GTRecord
@@ -68,7 +69,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
@Override
public void init(GTInfo info) {
this.info = info;
-
+ ImmutableBitSet dDims = info.getDynamicDims();
this.serializers = new DataTypeSerializer[info.getColumnCount()];
for (int i = 0; i < serializers.length; i++) {
DimensionEncoding dimEnc = i < dimEncs.length ? dimEncs[i] : null;
@@ -77,8 +78,14 @@ public class CubeCodeSystem implements IGTCodeSystem {
// for dimensions
serializers[i] = dimEnc.asDataTypeSerializer();
} else {
- // for measures
- serializers[i] = DataTypeSerializer.create(info.getColumnType(i));
+ DataTypeSerializer dSerializer = DataTypeSerializer.create(info.getColumnType(i));
+ if (dDims != null && dDims.get(i)) {
+ // for dynamic dimensions
+ dSerializer = new DynamicDimSerializer(dSerializer);
+ } else {
+ // for measures
+ }
+ serializers[i] = dSerializer;
}
}
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 79732e8..2a819bc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -44,7 +44,9 @@ public class CubeGridTable {
builder.setColumns(mapping.getDataTypes());
builder.setPrimaryKey(mapping.getPrimaryKey());
builder.enableColumnBlock(mapping.getColumnBlocks());
-
+ if (mapping instanceof CuboidToGridTableMappingExt) {
+ builder.enableDynamicDims(((CuboidToGridTableMappingExt) mapping).getDynamicDims());
+ }
return builder.build();
}
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
index fbdd07e..32c4ca0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java
@@ -35,8 +35,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
+ private final List<TblColRef> dynDims;
private final List<DynamicFunctionDesc> dynFuncs;
+ private ImmutableBitSet dynamicDims;
+
private List<DataType> dynGtDataTypes;
private List<ImmutableBitSet> dynGtColBlocks;
@@ -44,8 +47,9 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
private Map<FunctionDesc, Integer> dynMetrics2gt;
- public CuboidToGridTableMappingExt(Cuboid cuboid, List<DynamicFunctionDesc> dynFuncs) {
+ public CuboidToGridTableMappingExt(Cuboid cuboid, List<TblColRef> dynDims, List<DynamicFunctionDesc> dynFuncs) {
super(cuboid);
+ this.dynDims = dynDims;
this.dynFuncs = dynFuncs;
init();
}
@@ -59,6 +63,15 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
int gtColIdx = super.getColumnCount();
BitSet rtColBlock = new BitSet();
+ // dynamic dimensions
+ for (TblColRef rtDim : dynDims) {
+ dynDim2gt.put(rtDim, gtColIdx);
+ dynGtDataTypes.add(rtDim.getType());
+ rtColBlock.set(gtColIdx);
+ gtColIdx++;
+ }
+ dynamicDims = new ImmutableBitSet(rtColBlock);
+
// dynamic metrics
for (DynamicFunctionDesc rtFunc : dynFuncs) {
dynMetrics2gt.put(rtFunc, gtColIdx);
@@ -70,9 +83,13 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping {
dynGtColBlocks.add(new ImmutableBitSet(rtColBlock));
}
+ public ImmutableBitSet getDynamicDims() {
+ return dynamicDims;
+ }
+
@Override
public int getColumnCount() {
- return super.getColumnCount() + dynMetrics2gt.size();
+ return super.getColumnCount() + dynDims.size() + dynFuncs.size();
}
@Override
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d10d6e7..739adf8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -57,6 +57,9 @@ public class GTInfo {
int rowBlockSize; // 0: disable row block
ImmutableBitSet colBlocksAll;
+ // not included during serialization, only used for loadColumns
+ ImmutableBitSet dynamicDims;
+
// must create from builder
private GTInfo() {
}
@@ -93,6 +96,10 @@ public class GTInfo {
return colAll;
}
+ public ImmutableBitSet getDynamicDims() {
+ return dynamicDims;
+ }
+
public boolean isRowBlockEnabled() {
return rowBlockSize > 0;
}
@@ -214,6 +221,10 @@ public class GTInfo {
it.remove();
}
colBlocks = list.toArray(new ImmutableBitSet[list.size()]);
+
+ // for dynamic dimensions
+ if (dynamicDims == null)
+ dynamicDims = ImmutableBitSet.EMPTY;
}
public static class Builder {
@@ -269,6 +280,12 @@ public class GTInfo {
return this;
}
+ /** optional */
+ public Builder enableDynamicDims(ImmutableBitSet dynamicDims) {
+ info.dynamicDims = dynamicDims;
+ return this;
+ }
+
public GTInfo build() {
info.validate();
return info;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java
new file mode 100644
index 0000000..a1c42a8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+/**
+ * For dynamic dimensions, the code length must be fixed
+ */
+public class DynamicDimSerializer<T> extends DataTypeSerializer<T> {
+
+ private final DataTypeSerializer<T> dimDataTypeSerializer;
+
+ public DynamicDimSerializer(DataTypeSerializer<T> dimDataTypeSerializer) {
+ this.dimDataTypeSerializer = dimDataTypeSerializer;
+ }
+
+ public void serialize(T value, ByteBuffer out) {
+ dimDataTypeSerializer.serialize(value, out);
+ }
+
+ public T deserialize(ByteBuffer in) {
+ return dimDataTypeSerializer.deserialize(in);
+ }
+
+ public int peekLength(ByteBuffer in) {
+ return maxLength();
+ }
+
+ public int maxLength() {
+ return dimDataTypeSerializer.maxLength();
+ }
+
+ public int getStorageBytesEstimate() {
+ return dimDataTypeSerializer.getStorageBytesEstimate();
+ }
+
+ /** An optional convenient method that converts a string to this data type (for dimensions) */
+ public T valueOf(String str) {
+ return dimDataTypeSerializer.valueOf(str);
+ }
+
+ /** Convert from obj to string */
+ public String toString(T value) {
+ return dimDataTypeSerializer.toString(value);
+ }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 45ba95a..0b23e48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,8 +19,10 @@
package org.apache.kylin.metadata.realization;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DynamicFunctionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -57,6 +59,8 @@ public class SQLDigest {
public List<TblColRef> groupbyColumns;
public Set<TblColRef> subqueryJoinParticipants;
+ public Map<TblColRef, TupleExpression> dynGroupbyColumns;
+
// aggregation
public Set<TblColRef> metricColumns;
public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply
@@ -80,7 +84,8 @@ public class SQLDigest {
public Set<MeasureDesc> involvedMeasure;
public SQLDigest(String factTable, Set<TblColRef> allColumns, List<JoinDesc> joinDescs, // model
- List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, // group by
+ List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants,
+ Map<TblColRef, TupleExpression> dynGroupByColumns, // group by
Set<TblColRef> metricColumns, List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, // aggregation
List<DynamicFunctionDesc> dynAggregations, //
Set<TblColRef> rtDimensionColumns, Set<TblColRef> rtMetricColumns, // dynamic col related columns
@@ -95,6 +100,8 @@ public class SQLDigest {
this.groupbyColumns = groupbyColumns;
this.subqueryJoinParticipants = subqueryJoinParticipants;
+ this.dynGroupbyColumns = dynGroupByColumns;
+
this.metricColumns = metricColumns;
this.aggregations = aggregations;
this.aggrSqlCalls = aggrSqlCalls;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index d0f2ca2..f99c868 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DynamicFunctionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -75,7 +76,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
protected Cuboid cuboid;
public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, //
- Set<TblColRef> groupByDims, //
+ Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, //
Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
TupleFilter havingFilter, StorageContext context) {
this.context = context;
@@ -102,6 +103,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
//replace the constant values in filter to dictionary codes
Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
+ groupByPushDown.addAll(dynGroupsDims);
this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown);
this.havingFilter = havingFilter;
@@ -112,10 +114,16 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
// for dynamic cols, which are as appended columns to GTInfo
BitSet tmpGtDynCols = new BitSet();
+ tmpGtDynCols.or(mapping.makeGridTableColumns(Sets.newHashSet(dynGroupsDims)).mutable());
tmpGtDynCols.or(mapping.makeGridTableColumns(dynFuncs).mutable());
this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
- this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynFuncs.size());
+ this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynGroupExprs.size() + dynFuncs.size());
+ // for dynamic dimensions
+ for (TupleExpression rtGroupExpr : dynGroupExprs) {
+ this.tupleExpressionList
+ .add(GTUtil.convertFilterColumnsAndConstants(rtGroupExpr, gtInfo, mapping, groupByPushDown));
+ }
// for dynamic measures
Set<FunctionDesc> tmpRtAggrMetrics = Sets.newHashSet();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index d8b245c..95ffa35 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -53,12 +54,12 @@ public class CubeSegmentScanner implements IGTScanner {
final GTScanRequest scanRequest;
public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, //
- Set<TblColRef> groups, //
+ Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, //
Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
TupleFilter originalfilter, TupleFilter havingFilter, StorageContext context) {
-
+
logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName());
-
+
this.cuboid = cuboid;
this.cubeSeg = cubeSeg;
@@ -74,20 +75,20 @@ public class CubeSegmentScanner implements IGTScanner {
CubeScanRangePlanner scanRangePlanner;
try {
- scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, dynFuncs,
- havingFilter, context);
+ scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
+ dynGroupExprs, metrics, dynFuncs, havingFilter, context);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
scanRequest = scanRangePlanner.planScanRequest();
-
+
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
}
-
+
public boolean isSegmentSkipped() {
return scanner.isSegmentSkipped();
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 728bb46..b9b10c2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -42,6 +42,7 @@ import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.gridtable.StorageLimitLevel;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.CaseTupleFilter;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -93,7 +94,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
continue;
}
- scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), //
+ scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
+ request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
request.getMetrics(), request.getDynFuncs(), //
request.getFilter(), request.getHavingFilter(), request.getContext());
if (!scanner.isSegmentSkipped())
@@ -104,7 +106,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return ITupleIterator.EMPTY_TUPLE_ITERATOR;
return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(),
- request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest);
+ request.getDynGroups(), request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest);
}
public GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext context, SQLDigest sqlDigest,
@@ -144,13 +146,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// set cuboid to GridTable mapping;
boolean noDynamicCols;
-
+ // dynamic dimensions
+ List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet());
+ noDynamicCols = dynGroups.isEmpty();
+ List<TupleExpression> dynGroupExprs = Lists.newArrayListWithExpectedSize(sqlDigest.dynGroupbyColumns.size());
+ for (TblColRef dynGroupCol : dynGroups) {
+ dynGroupExprs.add(sqlDigest.dynGroupbyColumns.get(dynGroupCol));
+ }
// dynamic measures
List<DynamicFunctionDesc> dynFuncs = sqlDigest.dynAggregations;
- noDynamicCols = dynFuncs.isEmpty();
+ noDynamicCols = noDynamicCols && dynFuncs.isEmpty();
CuboidToGridTableMapping mapping = noDynamicCols ? new CuboidToGridTableMapping(cuboid)
- : new CuboidToGridTableMappingExt(cuboid, dynFuncs);
+ : new CuboidToGridTableMappingExt(cuboid, dynGroups, dynFuncs);
context.setMapping(mapping);
// set whether to aggr at storage
@@ -171,8 +179,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
context.setFilterMask(getQueryFilterMask(filterColumnD));
// set limit push down
- enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filterD, loosenedColumnD,
- sqlDigest.aggregations, context);
+ enableStorageLimitIfPossible(cuboid, groups, dynGroups, derivedPostAggregation, groupsD, filterD,
+ loosenedColumnD, sqlDigest.aggregations, context);
// set whether to aggregate results from multiple partitions
enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
// check query deadline
@@ -187,8 +195,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
cubeInstance.getName(), cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(),
context.getStorageLimitLevel(), context.isNeedStorageAggregation());
- return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, dynFuncs, filterD,
- havingFilter, context);
+ return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, dynGroups, dynGroupExprs, filterColumnD,
+ metrics, dynFuncs, filterD, havingFilter, context);
}
protected abstract String getGTStorage();
@@ -408,7 +416,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups,
+ private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, List<TblColRef> dynGroups,
Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter,
Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) {
@@ -424,6 +432,11 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
+ " with cuboid columns: " + cuboid.getColumns());
}
+ if (!dynGroups.isEmpty()) {
+ storageLimitLevel = StorageLimitLevel.NO_LIMIT;
+ logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups);
+ }
+
// derived aggregation is bad, unless expanded columns are already in group by
if (!groups.containsAll(derivedPostAggregation)) {
storageLimitLevel = StorageLimitLevel.NO_LIMIT;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
index c66e813..fdc976e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Set;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DynamicFunctionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -36,17 +37,22 @@ public class GTCubeStorageQueryRequest implements Serializable {
private Set<TblColRef> groups;
private Set<TblColRef> filterCols;
private Set<FunctionDesc> metrics;
+ private List<TblColRef> dynGroups;
+ private List<TupleExpression> dynGroupExprs;
private List<DynamicFunctionDesc> dynFuncs;
private TupleFilter filter;
private TupleFilter havingFilter;
private StorageContext context;
- public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+ public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, //
+ Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, //
Set<TblColRef> filterCols, Set<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, //
TupleFilter filter, TupleFilter havingFilter, StorageContext context) {
this.cuboid = cuboid;
this.dimensions = dimensions;
this.groups = groups;
+ this.dynGroups = dynGroups;
+ this.dynGroupExprs = dynGroupExprs;
this.filterCols = filterCols;
this.metrics = metrics;
this.dynFuncs = dynFuncs;
@@ -79,6 +85,14 @@ public class GTCubeStorageQueryRequest implements Serializable {
this.groups = groups;
}
+ public List<TblColRef> getDynGroups() {
+ return dynGroups;
+ }
+
+ public List<TupleExpression> getDynGroupExprs() {
+ return dynGroupExprs;
+ }
+
public Set<FunctionDesc> getMetrics() {
return metrics;
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index c067e33..b8dff8b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import com.google.common.collect.Sets;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -53,14 +54,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
private int scanCount;
private int scanCountDelta;
- public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
- Set<TblColRef> groups, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) {
+ public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid,
+ Set<TblColRef> selectedDimensions, List<TblColRef> rtGroups, Set<TblColRef> groups, //
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) {
this.context = context;
this.scanners = scanners;
+ Set<TblColRef> selectedDims = Sets.newHashSet(selectedDimensions);
+ selectedDims.addAll(rtGroups);
+
segmentCubeTupleIterators = Lists.newArrayList();
for (CubeSegmentScanner scanner : scanners) {
- segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+ segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDims, selectedMetrics, returnTupleInfo, context));
}
if (context.mergeSortPartitionResults() && !sqlDigest.isRawQuery) {
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index c432c12..61aa560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DynamicFunctionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -140,6 +141,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
try {
SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", /*allCol*/ Collections.<TblColRef> emptySet(), /*join*/ null, //
groups, /*subqueryJoinParticipants*/ Sets.<TblColRef> newHashSet(), //
+ /*dynamicGroupByColumns*/ Collections.<TblColRef, TupleExpression> emptyMap(), //
/*metricCol*/ Collections.<TblColRef> emptySet(), aggregations, /*aggrSqlCalls*/ Collections.<SQLCall> emptyList(), //
/*dynamicAggregations*/ Collections.<DynamicFunctionDesc> emptyList(), //
/*runtimeDimensionColumns*/ Collections.<TblColRef> emptySet(), //
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 0eff905..84f7676 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -266,12 +266,37 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
this.groups = Lists.newArrayList();
for (int i = getGroupSet().nextSetBit(0); i >= 0; i = getGroupSet().nextSetBit(i + 1)) {
TupleExpression tupleExpression = inputColumnRowType.getSourceColumnsByIndex(i);
- Set<TblColRef> srcCols = ExpressionColCollector.collectColumns(tupleExpression);
- // if no source columns, use target column instead
- if (srcCols.isEmpty()) {
- srcCols.add(inputColumnRowType.getColumnByIndex(i));
+ if (tupleExpression instanceof ColumnTupleExpression) {
+ this.groups.add(((ColumnTupleExpression) tupleExpression).getColumn());
+ } else {
+ TblColRef groupOutCol = inputColumnRowType.getColumnByIndex(i);
+ Pair<Set<TblColRef>, Set<TblColRef>> cols = ExpressionColCollector.collectColumnsPair(tupleExpression);
+
+ // push down only available for the innermost aggregation
+ boolean ifPushDown = !afterAggregate;
+
+ // if measure columns exist, don't do push down
+ if (!cols.getSecond().isEmpty()) {
+ ifPushDown = false;
+ }
+
+ // if existing a dimension which is a derived column, don't do push down
+ for (TblColRef dimCol : cols.getFirst()) {
+ if (!this.context.belongToFactTableDims(dimCol)) {
+ ifPushDown = false;
+ break;
+ }
+ }
+
+ if (ifPushDown) {
+ this.groups.add(groupOutCol);
+ this.context.dynGroupBy.put(groupOutCol, tupleExpression);
+ } else {
+ this.groups.addAll(cols.getFirst());
+ this.groups.addAll(cols.getSecond());
+ this.context.dynamicFields.remove(groupOutCol);
+ }
}
- this.groups.addAll(srcCols);
}
}
@@ -321,7 +346,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
} else if (aggCall.getAggregation() instanceof SqlCountAggFunction && !aggCall.isDistinct()) {
if (tupleExpr instanceof ColumnTupleExpression) {
TblColRef srcCol = ((ColumnTupleExpression) tupleExpr).getColumn();
- if (this.context.belongToFactTable(srcCol)) {
+ if (this.context.belongToFactTableDims(srcCol)) {
tupleExpr = getCountColumnExpression(srcCol);
TblColRef column = TblColRef.newInnerColumn(tupleExpr.getDigest(),
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 20533ad..f3dcd1b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -33,6 +33,8 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.expression.ExpressionColCollector;
+import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.DataModelDesc;
@@ -158,6 +160,8 @@ public class OLAPContext {
// dynamic columns info, note that the name of TblColRef will be the field name
public Map<TblColRef, RelDataType> dynamicFields = new HashMap<>();
+ public Map<TblColRef, TupleExpression> dynGroupBy = new HashMap<>();
+
// hive query
public String sql = "";
@@ -172,6 +176,9 @@ public class OLAPContext {
public SQLDigest getSQLDigest() {
if (sqlDigest == null) {
Set<TblColRef> rtDimColumns = new HashSet<>();
+ for (TupleExpression tupleExpr : dynGroupBy.values()) {
+ rtDimColumns.addAll(ExpressionColCollector.collectColumns(tupleExpr));
+ }
Set<TblColRef> rtMetricColumns = new HashSet<>();
List<DynamicFunctionDesc> dynFuncs = Lists.newLinkedList();
for (FunctionDesc functionDesc : aggregations) {
@@ -183,7 +190,7 @@ public class OLAPContext {
}
}
sqlDigest = new SQLDigest(firstTableScan.getTableName(), allColumns, joins, // model
- groupByColumns, subqueryJoinParticipants, // group by
+ groupByColumns, subqueryJoinParticipants, dynGroupBy, // group by
metricsColumns, aggregations, aggrSqlCalls, dynFuncs, // aggregation
rtDimColumns, rtMetricColumns, // runtime related columns
filterColumns, filter, havingFilter, // filter
@@ -211,7 +218,7 @@ public class OLAPContext {
return false;
}
- public boolean belongToFactTable(TblColRef tblColRef) {
+ public boolean belongToFactTableDims(TblColRef tblColRef) {
if (!belongToContextTables(tblColRef)) {
return false;
}
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.