You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/06/23 08:43:34 UTC
kylin git commit: refactor about default gtstore
Repository: kylin
Updated Branches:
refs/heads/master 62ae3cb16 -> 4fd74fc6a
refactor about default gtstore
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4fd74fc6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4fd74fc6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4fd74fc6
Branch: refs/heads/master
Commit: 4fd74fc6a7d7c9bcccab38b57da76f2b983cf5ef
Parents: 62ae3cb
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Jun 23 16:37:09 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Jun 23 16:37:09 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/gridtable/ScannerWorker.java | 11 +-
.../storage/gtrecord/CubeSegmentScanner.java | 100 +++++
.../storage/gtrecord/CubeTupleConverter.java | 270 +++++++++++++
.../gtrecord/GTCubeStorageQueryBase.java | 377 +++++++++++++++++++
.../gtrecord/SequentialCubeTupleIterator.java | 210 +++++++++++
.../hbase/cube/v2/CubeSegmentScanner.java | 100 -----
.../storage/hbase/cube/v2/CubeStorageQuery.java | 349 +----------------
.../hbase/cube/v2/CubeTupleConverter.java | 270 -------------
.../cube/v2/SequentialCubeTupleIterator.java | 210 -----------
9 files changed, 968 insertions(+), 929 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
index 1ac3b02..586a584 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -24,8 +24,6 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.slf4j.Logger;
@@ -36,7 +34,7 @@ public class ScannerWorker {
private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
private IGTScanner internal = null;
- public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) {
+ public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
if (scanRequest == null) {
logger.info("Segment {} will be skipped", cubeSeg);
internal = new EmptyGTScanner(0);
@@ -46,12 +44,7 @@ public class ScannerWorker {
final GTInfo info = scanRequest.getInfo();
try {
- IGTStorage rpc;
- if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
- rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug
- } else {
- rpc = (IGTStorage) Class.forName(KylinConfig.getInstanceFromEnv().getDefaultIGTStorage()).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
- }
+ IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
internal = rpc.getGTScanner(scanRequest);
} catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException("error", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
new file mode 100644
index 0000000..c12159d
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.BuildInFunctionTransformer;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.ScannerWorker;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CubeSegmentScanner implements IGTScanner {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
+
+ final CubeSegment cubeSeg;
+ final ScannerWorker scanner;
+ final Cuboid cuboid;
+
+ final GTScanRequest scanRequest;
+
+ public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+ Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context,String gtStorage) {
+ this.cuboid = cuboid;
+ this.cubeSeg = cubeSeg;
+
+ // translate FunctionTupleFilter to IN clause
+ ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+ filter = translator.transform(filter);
+
+ String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner();
+ GTScanRangePlanner scanRangePlanner;
+ try {
+ scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ scanRequest = scanRangePlanner.planScanRequest();
+ if (scanRequest != null) {
+ scanRequest.setAllowPreAggregation(!context.isExactAggregation());
+ scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ if (context.isLimitEnabled())
+ scanRequest.setRowLimit(context.getLimit());
+ }
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest,gtStorage);
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return scanner.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return scanRequest == null ? null : scanRequest.getInfo();
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return scanner.getScannedRowCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
new file mode 100644
index 0000000..d6917e1
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * convert GTRecord to tuple
+ */
+public class CubeTupleConverter {
+
+ final CubeSegment cubeSeg;
+ final Cuboid cuboid;
+ final TupleInfo tupleInfo;
+ final List<IDerivedColumnFiller> derivedColFillers;
+
+ final int[] gtColIdx;
+ final int[] tupleIdx;
+ final Object[] gtValues;
+ final MeasureType<?>[] measureTypes;
+
+ final List<IAdvMeasureFiller> advMeasureFillers;
+ final List<Integer> advMeasureIndexInGTValues;
+
+ final int nSelectedDims;
+
+ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ this.cubeSeg = cubeSeg;
+ this.cuboid = cuboid;
+ this.tupleInfo = returnTupleInfo;
+ this.derivedColFillers = Lists.newArrayList();
+
+ List<TblColRef> cuboidDims = cuboid.getColumns();
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+ nSelectedDims = selectedDimensions.size();
+ gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+ tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+ gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
+
+ // measure types don't have this many, but aligned length make programming easier
+ measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
+
+ advMeasureFillers = Lists.newArrayListWithCapacity(1);
+ advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+
+ int iii = 0;
+
+ // pre-calculate dimension index mapping to tuple
+ for (TblColRef dim : selectedDimensions) {
+ int i = mapping.getIndexOf(dim);
+ gtColIdx[iii] = i;
+ tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+ iii++;
+ }
+
+ for (FunctionDesc metric : selectedMetrics) {
+ int i = mapping.getIndexOf(metric);
+ gtColIdx[iii] = i;
+
+ if (metric.needRewrite()) {
+ String rewriteFieldName = metric.getRewriteFieldName();
+ tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
+ }
+ // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
+ else {
+ TblColRef col = metric.getParameter().getColRefs().get(0);
+ tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+ }
+
+ MeasureType<?> measureType = metric.getMeasureType();
+ if (measureType.needAdvancedTupleFilling()) {
+ Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
+ advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
+ advMeasureIndexInGTValues.add(iii);
+ } else {
+ measureTypes[iii] = measureType;
+ }
+
+ iii++;
+ }
+
+ // prepare derived columns and filler
+ Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+ for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
+ TblColRef[] hostCols = entry.getKey().data;
+ for (DeriveInfo deriveInfo : entry.getValue()) {
+ IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
+ if (filler != null) {
+ derivedColFillers.add(filler);
+ }
+ }
+ }
+ }
+
+ // load only needed dictionaries
+ private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ for (TblColRef col : columnsNeedDictionary) {
+ result.put(col, cubeSeg.getDictionary(col));
+ }
+ return result;
+ }
+
+ public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
+
+ record.getValues(gtColIdx, gtValues);
+
+ // dimensions
+ for (int i = 0; i < nSelectedDims; i++) {
+ int ti = tupleIdx[i];
+ if (ti >= 0) {
+ tuple.setDimensionValue(ti, toString(gtValues[i]));
+ }
+ }
+
+ // measures
+ for (int i = nSelectedDims; i < gtColIdx.length; i++) {
+ int ti = tupleIdx[i];
+ if (ti >= 0 && measureTypes[i] != null) {
+ measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]);
+ }
+ }
+
+ // derived
+ for (IDerivedColumnFiller filler : derivedColFillers) {
+ filler.fillDerivedColumns(gtValues, tuple);
+ }
+
+ // advanced measure filling, due to possible row split, will complete at caller side
+ if (advMeasureFillers.isEmpty()) {
+ return null;
+ } else {
+ for (int i = 0; i < advMeasureFillers.size(); i++) {
+ Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)];
+ advMeasureFillers.get(i).reload(measureValue);
+ }
+ return advMeasureFillers;
+ }
+ }
+
+ private interface IDerivedColumnFiller {
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
+ }
+
+ private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
+ boolean allHostsPresent = true;
+ final int[] hostTmpIdx = new int[hostCols.length];
+ for (int i = 0; i < hostCols.length; i++) {
+ hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]);
+ allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0;
+ }
+
+ boolean needCopyDerived = false;
+ final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < deriveInfo.columns.length; i++) {
+ TblColRef col = deriveInfo.columns[i];
+ derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+ needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
+ }
+
+ if ((allHostsPresent && needCopyDerived) == false)
+ return null;
+
+ switch (deriveInfo.type) {
+ case LOOKUP:
+ return new IDerivedColumnFiller() {
+ CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+ LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+ int[] derivedColIdx = initDerivedColIdx();
+ Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+ private int[] initDerivedColIdx() {
+ int[] idx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < idx.length; i++) {
+ idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ }
+ return idx;
+ }
+
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ for (int i = 0; i < hostTmpIdx.length; i++) {
+ lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+ }
+
+ String[] lookupRow = lookupTable.getRow(lookupKey);
+
+ if (lookupRow != null) {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ String value = lookupRow[derivedColIdx[i]];
+ tuple.setDimensionValue(derivedTupleIdx[i], value);
+ }
+ }
+ } else {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ tuple.setDimensionValue(derivedTupleIdx[i], null);
+ }
+ }
+ }
+ }
+ };
+ case PK_FK:
+ return new IDerivedColumnFiller() {
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+ tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+ }
+ };
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private int indexOnTheGTValues(TblColRef col) {
+ List<TblColRef> cuboidDims = cuboid.getColumns();
+ int cuboidIdx = cuboidDims.indexOf(col);
+ for (int i = 0; i < gtColIdx.length; i++) {
+ if (gtColIdx[i] == cuboidIdx)
+ return i;
+ }
+ return -1;
+ }
+
+ private static String toString(Object o) {
+ return o == null ? null : o.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
new file mode 100644
index 0000000..e58e74a
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.translate.DerivedFilterTranslator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class GTCubeStorageQueryBase implements IStorageQuery {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTCubeStorageQueryBase.class);
+
+ private final CubeInstance cubeInstance;
+ private final CubeDesc cubeDesc;
+
+ public GTCubeStorageQueryBase(CubeInstance cube) {
+ this.cubeInstance = cube;
+ this.cubeDesc = cube.getDescriptor();
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ // allow custom measures hack
+ notifyBeforeStorageQuery(sqlDigest);
+
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TupleFilter filter = sqlDigest.filter;
+
+ // build dimension & metrics
+ Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+ Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+ buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+ // all dimensions = groups + other(like filter) dimensions
+ Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
+ otherDims.removeAll(groups);
+
+ // expand derived (xxxD means contains host columns only, derived columns were translated)
+ Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+ Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+ Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
+ otherDimsD.removeAll(groupsD);
+
+ // identify cuboid
+ Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+ dimensionsD.addAll(groupsD);
+ dimensionsD.addAll(otherDimsD);
+ Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
+ context.setCuboid(cuboid);
+
+ // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+ Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+ boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
+ context.setExactAggregation(isExactAggregation);
+
+ // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+ TupleFilter filterD = translateDerived(filter, groupsD);
+
+ setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+ setLimit(filter, context);
+
+ List<CubeSegmentScanner> scanners = Lists.newArrayList();
+ for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+ CubeSegmentScanner scanner;
+ if (cubeSeg.getInputRecords() == 0) {
+ logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
+ }
+ scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage());
+ scanners.add(scanner);
+ }
+
+ if (scanners.isEmpty())
+ return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+
+ return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+ }
+
+ protected abstract String getGTStorage();
+
+ private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+ for (FunctionDesc func : sqlDigest.aggregations) {
+ if (!func.isDimensionAsMetric()) {
+ // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+ metrics.add(findAggrFuncFromCubeDesc(func));
+ }
+ }
+
+ for (TblColRef column : sqlDigest.allColumns) {
+ // skip measure columns
+ if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) {
+ continue;
+ }
+
+ dimensions.add(column);
+ }
+ }
+
+ private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (measure.getFunction().equals(aggrFunc))
+ return measure.getFunction();
+ }
+ return aggrFunc;
+ }
+
+ private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+ Set<TblColRef> expanded = Sets.newHashSet();
+ for (TblColRef col : cols) {
+ if (cubeDesc.hasHostColumn(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef hostCol : hostInfo.columns) {
+ expanded.add(hostCol);
+ if (hostInfo.isOneToOne == false)
+ derivedPostAggregation.add(hostCol);
+ }
+ } else {
+ expanded.add(col);
+ }
+ }
+ return expanded;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+ Collection<? extends TupleFilter> toCheck;
+ if (filter instanceof CompareTupleFilter) {
+ toCheck = Collections.singleton(filter);
+ } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+ toCheck = filter.getChildren();
+ } else {
+ return (Set<TblColRef>) Collections.EMPTY_SET;
+ }
+
+ Set<TblColRef> result = Sets.newHashSet();
+ for (TupleFilter f : toCheck) {
+ if (f instanceof CompareTupleFilter) {
+ CompareTupleFilter compFilter = (CompareTupleFilter) f;
+ // is COL=const ?
+ if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+ result.add(compFilter.getColumn());
+ }
+ }
+ }
+
+ // expand derived
+ Set<TblColRef> resultD = Sets.newHashSet();
+ for (TblColRef col : result) {
+ if (cubeDesc.isExtendedColumn(col)) {
+ throw new CubeDesc.CannotFilterExtendedColumnException(col);
+ }
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ if (hostInfo.isOneToOne) {
+ for (TblColRef hostCol : hostInfo.columns) {
+ resultD.add(hostCol);
+ }
+ }
+ //if not one2one, it will be pruned
+ } else {
+ resultD.add(col);
+ }
+ }
+ return resultD;
+ }
+
+ private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+ boolean exact = true;
+
+ if (cuboid.requirePostAggregation()) {
+ exact = false;
+ logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (groups.containsAll(derivedPostAggregation) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+ }
+
+ // other columns (from filter) is bad, unless they are ensured to have single value
+ if (singleValuesD.containsAll(othersD) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because some column not on group by: " + othersD //
+ + " (single value column: " + singleValuesD + ")");
+ }
+
+ // for partitioned cube, the partition column must belong to group by or has single value
+ PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+ if (partDesc.isPartitioned()) {
+ TblColRef col = partDesc.getPartitionDateColumnRef();
+ if (!groups.contains(col) && !singleValuesD.contains(col)) {
+ exact = false;
+ logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+ }
+ }
+
+ if (exact) {
+ logger.info("exactAggregation is true");
+ }
+ return exact;
+ }
+
+ @SuppressWarnings("unchecked")
+ private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null)
+ return filter;
+
+ if (filter instanceof CompareTupleFilter) {
+ return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+ }
+
+ List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+ List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+ boolean modified = false;
+ for (TupleFilter child : children) {
+ TupleFilter translated = translateDerived(child, collector);
+ newChildren.add(translated);
+ if (child != translated)
+ modified = true;
+ }
+ if (modified) {
+ filter = replaceChildren(filter, newChildren);
+ }
+ return filter;
+ }
+
+ private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+ if (filter instanceof LogicalTupleFilter) {
+ LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+ r.addChildren(newChildren);
+ return r;
+ } else
+ throw new IllegalStateException("Cannot replaceChildren on " + filter);
+ }
+
+ private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+ if (compf.getColumn() == null || compf.getValues().isEmpty())
+ return compf;
+
+ TblColRef derived = compf.getColumn();
+ if (cubeDesc.isExtendedColumn(derived)) {
+ throw new CubeDesc.CannotFilterExtendedColumnException(derived);
+ }
+ if (cubeDesc.isDerived(derived) == false)
+ return compf;
+
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+ CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+ CubeSegment seg = cubeInstance.getLatestReadySegment();
+ LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+ Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+ TupleFilter translatedFilter = translated.getFirst();
+ boolean loosened = translated.getSecond();
+ if (loosened) {
+ collectColumnsRecursively(translatedFilter, collector);
+ }
+ return translatedFilter;
+ }
+
+ private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null)
+ return;
+
+ if (filter instanceof ColumnTupleFilter) {
+ collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+ }
+ for (TupleFilter child : filter.getChildren()) {
+ collectColumnsRecursively(child, collector);
+ }
+ }
+
+ private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+ if (cubeDesc.isExtendedColumn(col)) {
+ throw new CubeDesc.CannotFilterExtendedColumnException(col);
+ }
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef h : hostInfo.columns)
+ collector.add(h);
+ } else {
+ collector.add(col);
+ }
+ }
+
+ private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+ boolean hasMemHungryMeasure = false;
+ for (FunctionDesc func : metrics) {
+ hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
+ }
+
+ // need to limit the memory usage for memory hungry measures
+ if (hasMemHungryMeasure == false) {
+ return;
+ }
+
+ int rowSizeEst = dimensions.size() * 3;
+ for (FunctionDesc func : metrics) {
+ // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
+ rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
+ }
+
+ long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
+ if (rowEst > 0) {
+ logger.info("Memory budget is set to " + rowEst + " rows");
+ context.setThreshold((int) rowEst);
+ } else {
+ logger.info("Memory budget is not set.");
+ }
+ }
+
+ private void setLimit(TupleFilter filter, StorageContext context) {
+ boolean goodAggr = context.isExactAggregation();
+ boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+ boolean goodSort = context.hasSort() == false;
+ if (goodAggr && goodFilter && goodSort) {
+ logger.info("Enable limit " + context.getLimit());
+ context.enableLimit();
+ }
+ }
+
+ private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ MeasureType<?> measureType = measure.getFunction().getMeasureType();
+ measureType.adjustSqlDigest(measure, sqlDigest);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
new file mode 100644
index 0000000..3681e5e
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialCubeTupleIterator implements ITupleIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
+
+ protected final Cuboid cuboid;
+ protected final Set<TblColRef> selectedDimensions;
+ protected final Set<FunctionDesc> selectedMetrics;
+ protected final TupleInfo tupleInfo;
+ protected final Tuple tuple;
+ protected final Iterator<CubeSegmentScanner> scannerIterator;
+ protected final StorageContext context;
+
+ protected CubeSegmentScanner curScanner;
+ protected Iterator<GTRecord> curRecordIterator;
+ protected CubeTupleConverter curTupleConverter;
+ protected Tuple next;
+
+ private List<IAdvMeasureFiller> advMeasureFillers;
+ private int advMeasureRowsRemaining;
+ private int advMeasureRowIndex;
+
+ private int scanCount;
+ private int scanCountDelta;
+
+ public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+ this.cuboid = cuboid;
+ this.selectedDimensions = selectedDimensions;
+ this.selectedMetrics = selectedMetrics;
+ this.tupleInfo = returnTupleInfo;
+ this.tuple = new Tuple(returnTupleInfo);
+ this.scannerIterator = scanners.iterator();
+ this.context = context;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (hitLimitAndThreshold())
+ return false;
+
+ // consume any left rows from advanced measure filler
+ if (advMeasureRowsRemaining > 0) {
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ filler.fillTuple(tuple, advMeasureRowIndex);
+ }
+ advMeasureRowIndex++;
+ advMeasureRowsRemaining--;
+ next = tuple;
+ return true;
+ }
+
+ // get the next GTRecord
+ if (curScanner == null) {
+ if (scannerIterator.hasNext()) {
+ curScanner = scannerIterator.next();
+ curRecordIterator = curScanner.iterator();
+ if (curRecordIterator.hasNext()) {
+ //if the segment does not has any tuples, don't bother to create a converter
+ curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ }
+ } else {
+ return false;
+ }
+ }
+ if (curRecordIterator.hasNext() == false) {
+ close(curScanner);
+ curScanner = null;
+ curRecordIterator = null;
+ curTupleConverter = null;
+ return hasNext();
+ }
+
+ // now we have a GTRecord
+ GTRecord curRecord = curRecordIterator.next();
+
+ // translate into tuple
+ advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
+
+ // the simple case
+ if (advMeasureFillers == null) {
+ next = tuple;
+ return true;
+ }
+
+ // advanced measure filling, like TopN, will produce multiple tuples out of one record
+ advMeasureRowsRemaining = -1;
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ if (advMeasureRowsRemaining < 0)
+ advMeasureRowsRemaining = filler.getNumOfRows();
+ if (advMeasureRowsRemaining != filler.getNumOfRows())
+ throw new IllegalStateException();
+ }
+ if (advMeasureRowsRemaining < 0)
+ throw new IllegalStateException();
+
+ advMeasureRowIndex = 0;
+ return hasNext();
+ }
+
+
+ private boolean hitLimitAndThreshold() {
+ // check limit
+ if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
+ return true;
+ }
+ // check threshold
+ if (scanCount >= context.getThreshold()) {
+ throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
+ }
+ return false;
+ }
+
+ @Override
+ public ITuple next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ scanCount++;
+ if (++scanCountDelta >= 1000)
+ flushScanCountDelta();
+
+ ITuple result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ // hasNext() loop may exit because of limit, threshold, etc.
+ // close all the remaining segmentIterator
+ flushScanCountDelta();
+
+ if (curScanner != null)
+ close(curScanner);
+
+ while (scannerIterator.hasNext()) {
+ close(scannerIterator.next());
+ }
+ }
+
+ protected void close(CubeSegmentScanner scanner) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("Exception when close CubeScanner", e);
+ }
+ }
+
+ public int getScanCount() {
+ return scanCount;
+ }
+
+ private void flushScanCountDelta() {
+ context.increaseTotalScanCount(scanCountDelta);
+ scanCountDelta = 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
deleted file mode 100644
index 9890ae9..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.BuildInFunctionTransformer;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRangePlanner;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.gridtable.ScannerWorker;
-import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.StorageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CubeSegmentScanner implements IGTScanner {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
-
- final CubeSegment cubeSeg;
- final ScannerWorker scanner;
- final Cuboid cuboid;
-
- final GTScanRequest scanRequest;
-
- public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
- Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
- this.cuboid = cuboid;
- this.cubeSeg = cubeSeg;
-
- // translate FunctionTupleFilter to IN clause
- ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
- filter = translator.transform(filter);
-
- String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner();
- GTScanRangePlanner scanRangePlanner;
- try {
- scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- scanRequest = scanRangePlanner.planScanRequest();
- if (scanRequest != null) {
- scanRequest.setAllowPreAggregation(!context.isExactAggregation());
- scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
- if (context.isLimitEnabled())
- scanRequest.setRowLimit(context.getLimit());
- }
- scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return scanner.iterator();
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- }
-
- @Override
- public GTInfo getInfo() {
- return scanRequest == null ? null : scanRequest.getInfo();
- }
-
- @Override
- public int getScannedRowCount() {
- return scanner.getScannedRowCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index cec4e2f..f9c9a2b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -18,358 +18,27 @@
package org.apache.kylin.storage.hbase.cube.v2;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.translate.DerivedFilterTranslator;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class CubeStorageQuery implements IStorageQuery {
+public class CubeStorageQuery extends GTCubeStorageQueryBase {
private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
- private final CubeInstance cubeInstance;
- private final CubeDesc cubeDesc;
-
public CubeStorageQuery(CubeInstance cube) {
- this.cubeInstance = cube;
- this.cubeDesc = cube.getDescriptor();
+ super(cube);
}
@Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- // allow custom measures hack
- notifyBeforeStorageQuery(sqlDigest);
-
- Collection<TblColRef> groups = sqlDigest.groupbyColumns;
- TupleFilter filter = sqlDigest.filter;
-
- // build dimension & metrics
- Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
- Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
- buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
-
- // all dimensions = groups + other(like filter) dimensions
- Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
- otherDims.removeAll(groups);
-
- // expand derived (xxxD means contains host columns only, derived columns were translated)
- Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
- Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
- Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
- otherDimsD.removeAll(groupsD);
-
- // identify cuboid
- Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
- dimensionsD.addAll(groupsD);
- dimensionsD.addAll(otherDimsD);
- Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
- context.setCuboid(cuboid);
-
- // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
- Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
- boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
- context.setExactAggregation(isExactAggregation);
-
- // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
- TupleFilter filterD = translateDerived(filter, groupsD);
-
- setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
- setLimit(filter, context);
-
- List<CubeSegmentScanner> scanners = Lists.newArrayList();
- for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
- CubeSegmentScanner scanner;
- if (cubeSeg.getInputRecords() == 0) {
- logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
- }
- scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context);
- scanners.add(scanner);
- }
-
- if (scanners.isEmpty())
- return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-
- return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
- }
-
- private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
- for (FunctionDesc func : sqlDigest.aggregations) {
- if (!func.isDimensionAsMetric()) {
- // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
- metrics.add(findAggrFuncFromCubeDesc(func));
- }
- }
-
- for (TblColRef column : sqlDigest.allColumns) {
- // skip measure columns
- if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) {
- continue;
- }
-
- dimensions.add(column);
- }
- }
-
- private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- if (measure.getFunction().equals(aggrFunc))
- return measure.getFunction();
- }
- return aggrFunc;
- }
-
- private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
- Set<TblColRef> expanded = Sets.newHashSet();
- for (TblColRef col : cols) {
- if (cubeDesc.hasHostColumn(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef hostCol : hostInfo.columns) {
- expanded.add(hostCol);
- if (hostInfo.isOneToOne == false)
- derivedPostAggregation.add(hostCol);
- }
- } else {
- expanded.add(col);
- }
- }
- return expanded;
- }
-
- @SuppressWarnings("unchecked")
- private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
- Collection<? extends TupleFilter> toCheck;
- if (filter instanceof CompareTupleFilter) {
- toCheck = Collections.singleton(filter);
- } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
- toCheck = filter.getChildren();
+ protected String getGTStorage() {
+ if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+ return "org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC";
} else {
- return (Set<TblColRef>) Collections.EMPTY_SET;
- }
-
- Set<TblColRef> result = Sets.newHashSet();
- for (TupleFilter f : toCheck) {
- if (f instanceof CompareTupleFilter) {
- CompareTupleFilter compFilter = (CompareTupleFilter) f;
- // is COL=const ?
- if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
- result.add(compFilter.getColumn());
- }
- }
- }
-
- // expand derived
- Set<TblColRef> resultD = Sets.newHashSet();
- for (TblColRef col : result) {
- if (cubeDesc.isExtendedColumn(col)) {
- throw new CubeDesc.CannotFilterExtendedColumnException(col);
- }
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- if (hostInfo.isOneToOne) {
- for (TblColRef hostCol : hostInfo.columns) {
- resultD.add(hostCol);
- }
- }
- //if not one2one, it will be pruned
- } else {
- resultD.add(col);
- }
+ return KylinConfig.getInstanceFromEnv().getDefaultIGTStorage();
}
- return resultD;
}
-
- private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
- boolean exact = true;
-
- if (cuboid.requirePostAggregation()) {
- exact = false;
- logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
- }
-
- // derived aggregation is bad, unless expanded columns are already in group by
- if (groups.containsAll(derivedPostAggregation) == false) {
- exact = false;
- logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
- }
-
- // other columns (from filter) is bad, unless they are ensured to have single value
- if (singleValuesD.containsAll(othersD) == false) {
- exact = false;
- logger.info("exactAggregation is false because some column not on group by: " + othersD //
- + " (single value column: " + singleValuesD + ")");
- }
-
- // for partitioned cube, the partition column must belong to group by or has single value
- PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
- if (partDesc.isPartitioned()) {
- TblColRef col = partDesc.getPartitionDateColumnRef();
- if (!groups.contains(col) && !singleValuesD.contains(col)) {
- exact = false;
- logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
- }
- }
-
- if (exact) {
- logger.info("exactAggregation is true");
- }
- return exact;
- }
-
- @SuppressWarnings("unchecked")
- private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
- if (filter == null)
- return filter;
-
- if (filter instanceof CompareTupleFilter) {
- return translateDerivedInCompare((CompareTupleFilter) filter, collector);
- }
-
- List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
- List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
- boolean modified = false;
- for (TupleFilter child : children) {
- TupleFilter translated = translateDerived(child, collector);
- newChildren.add(translated);
- if (child != translated)
- modified = true;
- }
- if (modified) {
- filter = replaceChildren(filter, newChildren);
- }
- return filter;
- }
-
- private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
- if (filter instanceof LogicalTupleFilter) {
- LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
- r.addChildren(newChildren);
- return r;
- } else
- throw new IllegalStateException("Cannot replaceChildren on " + filter);
- }
-
- private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
- if (compf.getColumn() == null || compf.getValues().isEmpty())
- return compf;
-
- TblColRef derived = compf.getColumn();
- if (cubeDesc.isExtendedColumn(derived)) {
- throw new CubeDesc.CannotFilterExtendedColumnException(derived);
- }
- if (cubeDesc.isDerived(derived) == false)
- return compf;
-
- DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
- CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
- CubeSegment seg = cubeInstance.getLatestReadySegment();
- LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
- Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
- TupleFilter translatedFilter = translated.getFirst();
- boolean loosened = translated.getSecond();
- if (loosened) {
- collectColumnsRecursively(translatedFilter, collector);
- }
- return translatedFilter;
- }
-
- private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
- if (filter == null)
- return;
-
- if (filter instanceof ColumnTupleFilter) {
- collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
- }
- for (TupleFilter child : filter.getChildren()) {
- collectColumnsRecursively(child, collector);
- }
- }
-
- private void collectColumns(TblColRef col, Set<TblColRef> collector) {
- if (cubeDesc.isExtendedColumn(col)) {
- throw new CubeDesc.CannotFilterExtendedColumnException(col);
- }
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef h : hostInfo.columns)
- collector.add(h);
- } else {
- collector.add(col);
- }
- }
-
- private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
- boolean hasMemHungryMeasure = false;
- for (FunctionDesc func : metrics) {
- hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
- }
-
- // need to limit the memory usage for memory hungry measures
- if (hasMemHungryMeasure == false) {
- return;
- }
-
- int rowSizeEst = dimensions.size() * 3;
- for (FunctionDesc func : metrics) {
- // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
- rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
- }
-
- long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
- if (rowEst > 0) {
- logger.info("Memory budget is set to " + rowEst + " rows");
- context.setThreshold((int) rowEst);
- } else {
- logger.info("Memory budget is not set.");
- }
- }
-
- private void setLimit(TupleFilter filter, StorageContext context) {
- boolean goodAggr = context.isExactAggregation();
- boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
- boolean goodSort = context.hasSort() == false;
- if (goodAggr && goodFilter && goodSort) {
- logger.info("Enable limit " + context.getLimit());
- context.enableLimit();
- }
- }
-
- private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- MeasureType<?> measureType = measure.getFunction().getMeasureType();
- measureType.adjustSqlDigest(measure, sqlDigest);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
deleted file mode 100644
index a7346af..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * convert GTRecord to tuple
- */
-public class CubeTupleConverter {
-
- final CubeSegment cubeSeg;
- final Cuboid cuboid;
- final TupleInfo tupleInfo;
- final List<IDerivedColumnFiller> derivedColFillers;
-
- final int[] gtColIdx;
- final int[] tupleIdx;
- final Object[] gtValues;
- final MeasureType<?>[] measureTypes;
-
- final List<IAdvMeasureFiller> advMeasureFillers;
- final List<Integer> advMeasureIndexInGTValues;
-
- final int nSelectedDims;
-
- public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
- this.cubeSeg = cubeSeg;
- this.cuboid = cuboid;
- this.tupleInfo = returnTupleInfo;
- this.derivedColFillers = Lists.newArrayList();
-
- List<TblColRef> cuboidDims = cuboid.getColumns();
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
- nSelectedDims = selectedDimensions.size();
- gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
- tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
- gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
-
- // measure types don't have this many, but aligned length make programming easier
- measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
-
- advMeasureFillers = Lists.newArrayListWithCapacity(1);
- advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
-
- int iii = 0;
-
- // pre-calculate dimension index mapping to tuple
- for (TblColRef dim : selectedDimensions) {
- int i = mapping.getIndexOf(dim);
- gtColIdx[iii] = i;
- tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
- iii++;
- }
-
- for (FunctionDesc metric : selectedMetrics) {
- int i = mapping.getIndexOf(metric);
- gtColIdx[iii] = i;
-
- if (metric.needRewrite()) {
- String rewriteFieldName = metric.getRewriteFieldName();
- tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
- }
- // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
- else {
- TblColRef col = metric.getParameter().getColRefs().get(0);
- tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
- }
-
- MeasureType<?> measureType = metric.getMeasureType();
- if (measureType.needAdvancedTupleFilling()) {
- Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
- advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
- advMeasureIndexInGTValues.add(iii);
- } else {
- measureTypes[iii] = measureType;
- }
-
- iii++;
- }
-
- // prepare derived columns and filler
- Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
- for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
- TblColRef[] hostCols = entry.getKey().data;
- for (DeriveInfo deriveInfo : entry.getValue()) {
- IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
- if (filler != null) {
- derivedColFillers.add(filler);
- }
- }
- }
- }
-
- // load only needed dictionaries
- private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
- Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
- for (TblColRef col : columnsNeedDictionary) {
- result.put(col, cubeSeg.getDictionary(col));
- }
- return result;
- }
-
- public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
- record.getValues(gtColIdx, gtValues);
-
- // dimensions
- for (int i = 0; i < nSelectedDims; i++) {
- int ti = tupleIdx[i];
- if (ti >= 0) {
- tuple.setDimensionValue(ti, toString(gtValues[i]));
- }
- }
-
- // measures
- for (int i = nSelectedDims; i < gtColIdx.length; i++) {
- int ti = tupleIdx[i];
- if (ti >= 0 && measureTypes[i] != null) {
- measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]);
- }
- }
-
- // derived
- for (IDerivedColumnFiller filler : derivedColFillers) {
- filler.fillDerivedColumns(gtValues, tuple);
- }
-
- // advanced measure filling, due to possible row split, will complete at caller side
- if (advMeasureFillers.isEmpty()) {
- return null;
- } else {
- for (int i = 0; i < advMeasureFillers.size(); i++) {
- Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)];
- advMeasureFillers.get(i).reload(measureValue);
- }
- return advMeasureFillers;
- }
- }
-
- private interface IDerivedColumnFiller {
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
- }
-
- private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
- boolean allHostsPresent = true;
- final int[] hostTmpIdx = new int[hostCols.length];
- for (int i = 0; i < hostCols.length; i++) {
- hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]);
- allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0;
- }
-
- boolean needCopyDerived = false;
- final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
- for (int i = 0; i < deriveInfo.columns.length; i++) {
- TblColRef col = deriveInfo.columns[i];
- derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
- needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
- }
-
- if ((allHostsPresent && needCopyDerived) == false)
- return null;
-
- switch (deriveInfo.type) {
- case LOOKUP:
- return new IDerivedColumnFiller() {
- CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
- LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
- int[] derivedColIdx = initDerivedColIdx();
- Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
- private int[] initDerivedColIdx() {
- int[] idx = new int[deriveInfo.columns.length];
- for (int i = 0; i < idx.length; i++) {
- idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
- }
- return idx;
- }
-
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- for (int i = 0; i < hostTmpIdx.length; i++) {
- lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
- }
-
- String[] lookupRow = lookupTable.getRow(lookupKey);
-
- if (lookupRow != null) {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- String value = lookupRow[derivedColIdx[i]];
- tuple.setDimensionValue(derivedTupleIdx[i], value);
- }
- }
- } else {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- tuple.setDimensionValue(derivedTupleIdx[i], null);
- }
- }
- }
- }
- };
- case PK_FK:
- return new IDerivedColumnFiller() {
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
- tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
- }
- };
- default:
- throw new IllegalArgumentException();
- }
- }
-
- private int indexOnTheGTValues(TblColRef col) {
- List<TblColRef> cuboidDims = cuboid.getColumns();
- int cuboidIdx = cuboidDims.indexOf(col);
- for (int i = 0; i < gtColIdx.length; i++) {
- if (gtColIdx[i] == cuboidIdx)
- return i;
- }
- return -1;
- }
-
- private static String toString(Object o) {
- return o == null ? null : o.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
deleted file mode 100644
index f8b055c..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SequentialCubeTupleIterator implements ITupleIterator {
-
- private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
-
- protected final Cuboid cuboid;
- protected final Set<TblColRef> selectedDimensions;
- protected final Set<FunctionDesc> selectedMetrics;
- protected final TupleInfo tupleInfo;
- protected final Tuple tuple;
- protected final Iterator<CubeSegmentScanner> scannerIterator;
- protected final StorageContext context;
-
- protected CubeSegmentScanner curScanner;
- protected Iterator<GTRecord> curRecordIterator;
- protected CubeTupleConverter curTupleConverter;
- protected Tuple next;
-
- private List<IAdvMeasureFiller> advMeasureFillers;
- private int advMeasureRowsRemaining;
- private int advMeasureRowIndex;
-
- private int scanCount;
- private int scanCountDelta;
-
- public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
- Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
- this.cuboid = cuboid;
- this.selectedDimensions = selectedDimensions;
- this.selectedMetrics = selectedMetrics;
- this.tupleInfo = returnTupleInfo;
- this.tuple = new Tuple(returnTupleInfo);
- this.scannerIterator = scanners.iterator();
- this.context = context;
- }
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- if (hitLimitAndThreshold())
- return false;
-
- // consume any left rows from advanced measure filler
- if (advMeasureRowsRemaining > 0) {
- for (IAdvMeasureFiller filler : advMeasureFillers) {
- filler.fillTuple(tuple, advMeasureRowIndex);
- }
- advMeasureRowIndex++;
- advMeasureRowsRemaining--;
- next = tuple;
- return true;
- }
-
- // get the next GTRecord
- if (curScanner == null) {
- if (scannerIterator.hasNext()) {
- curScanner = scannerIterator.next();
- curRecordIterator = curScanner.iterator();
- if (curRecordIterator.hasNext()) {
- //if the segment does not has any tuples, don't bother to create a converter
- curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
- }
- } else {
- return false;
- }
- }
- if (curRecordIterator.hasNext() == false) {
- close(curScanner);
- curScanner = null;
- curRecordIterator = null;
- curTupleConverter = null;
- return hasNext();
- }
-
- // now we have a GTRecord
- GTRecord curRecord = curRecordIterator.next();
-
- // translate into tuple
- advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
-
- // the simple case
- if (advMeasureFillers == null) {
- next = tuple;
- return true;
- }
-
- // advanced measure filling, like TopN, will produce multiple tuples out of one record
- advMeasureRowsRemaining = -1;
- for (IAdvMeasureFiller filler : advMeasureFillers) {
- if (advMeasureRowsRemaining < 0)
- advMeasureRowsRemaining = filler.getNumOfRows();
- if (advMeasureRowsRemaining != filler.getNumOfRows())
- throw new IllegalStateException();
- }
- if (advMeasureRowsRemaining < 0)
- throw new IllegalStateException();
-
- advMeasureRowIndex = 0;
- return hasNext();
- }
-
-
- private boolean hitLimitAndThreshold() {
- // check limit
- if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
- return true;
- }
- // check threshold
- if (scanCount >= context.getThreshold()) {
- throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
- }
- return false;
- }
-
- @Override
- public ITuple next() {
- // fetch next record
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
-
- scanCount++;
- if (++scanCountDelta >= 1000)
- flushScanCountDelta();
-
- ITuple result = next;
- next = null;
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- // hasNext() loop may exit because of limit, threshold, etc.
- // close all the remaining segmentIterator
- flushScanCountDelta();
-
- if (curScanner != null)
- close(curScanner);
-
- while (scannerIterator.hasNext()) {
- close(scannerIterator.next());
- }
- }
-
- protected void close(CubeSegmentScanner scanner) {
- try {
- scanner.close();
- } catch (IOException e) {
- logger.error("Exception when close CubeScanner", e);
- }
- }
-
- public int getScanCount() {
- return scanCount;
- }
-
- private void flushScanCountDelta() {
- context.increaseTotalScanCount(scanCountDelta);
- scanCountDelta = 0;
- }
-
-}