You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/05 00:09:59 UTC
[18/21] kylin git commit: KYLIN-976 Clean up,
remove all explict check on TopN & HLLC
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index fbb258f..42946eb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -166,7 +166,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP);
for (Map<TblColRef, String> fuzzyValue : fuzzyValues) {
- result.add(new Pair<byte[], byte[]>(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
+ result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
}
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index 152b4af..f2b34fc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -94,6 +94,6 @@ public class DFSFileTable implements ReadableTable {
lastModified = Math.max(lastModified, file.getModificationTime());
}
- return new Pair<Long, Long>(size, lastModified);
+ return Pair.newPair(size, lastModified);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 45cc88e..3dddece 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -157,8 +157,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
int colParamIdx = 0; // index among parameters of column type
for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
String value;
- if (function.isCount() || function.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
+ if (function.isCount()) {
value = "1";
} else if (param.isColumnType()) {
value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 3edaefb..daa610f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -170,9 +170,8 @@ public class CuboidJob extends AbstractHadoopJob {
// number of reduce tasks
int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
- // adjust reducer number for cube which has DISTINCT_COUNT measures for
- // better performance
- if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
numReduceTasks = numReduceTasks * 4;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b1527e6..18bce34 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
MeasureDesc measureDesc = measureDescs.get(i);
MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
- dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
+ dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
}
}
if (dictMeasures.size() > 0) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index cf5049a..682a91c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -126,7 +126,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
MeasureDesc measureDesc = measureDescs.get(i);
MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
- dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
+ dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
}
}
if (dictMeasures.size() > 0) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 8f65691..c27b3f1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -63,6 +63,7 @@ import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -324,11 +325,11 @@ public class SparkCubing extends AbstractApplication {
}
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- if (measureDesc.getFunction().isTopN()) {
- List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
- TblColRef col = colRefs.get(colRefs.size() - 1);
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
+ FunctionDesc func = measureDesc.getFunction();
+ List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
+ for (TblColRef col : colRefs) {
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
index c9a78a4..bd952a1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
@@ -20,12 +20,13 @@ package org.apache.kylin.invertedindex.measure;
import java.nio.ByteBuffer;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
abstract public class FixedLenMeasureCodec<T> {
public static FixedLenMeasureCodec<?> get(DataType type) {
- if (type.isHLLC()) {
+ if (HLLCMeasureType.DATATYPE_HLLC.equals(type.getName())) {
return new FixedHLLCodec(type);
} else {
return new FixedPointLongCodec(type);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
deleted file mode 100644
index 16748a6..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- */
-public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
-
- private Iterator<Tuple> innerResultIterator;
-
- public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
- Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
- List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
- super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
- this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
- }
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
-
- if (innerResultIterator == null) {
- if (resultIterator == null) {
- if (rangeIterator.hasNext() == false)
- return false;
-
- resultIterator = doScan(rangeIterator.next());
- }
-
- if (resultIterator.hasNext() == false) {
- closeScanner();
- resultIterator = null;
- innerResultIterator = null;
- return hasNext();
- }
-
- Result result = resultIterator.next();
- scanCount++;
- if (++scanCountDelta >= 1000)
- flushScanCountDelta();
- innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
- }
-
- if (innerResultIterator.hasNext()) {
- next = innerResultIterator.next();
- return true;
- } else {
- innerResultIterator = null;
- return hasNext();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 4acd8f8..50cfefa 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -99,7 +99,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
assert cuboid.equals(range.getCuboid());
}
- this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
+ this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
this.oneTuple = new Tuple(returnTupleInfo);
this.rangeIterator = keyRanges.iterator();
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 063e254..c115f9c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -48,6 +48,7 @@ import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
@@ -95,9 +96,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
- // check whether this is a TopN query
- checkAndRewriteTopN(sqlDigest);
+ // allow custom measures hack
+ notifyBeforeStorageQuery(sqlDigest);
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
TupleFilter filter = sqlDigest.filter;
@@ -378,12 +378,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
for (HBaseColumnDesc hbCol : hbCols) {
bestHBCol = hbCol;
bestIndex = hbCol.findMeasure(aggrFunc);
- MeasureDesc measure = hbCol.getMeasures()[bestIndex];
- // criteria for holistic measure: Exact Aggregation && Exact Cuboid
- if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
- logger.info("Holistic count distinct chosen for " + aggrFunc);
- break;
- }
+ // we used to prefer specific measure over another (holistic distinct count), now it's gone
+ break;
}
RowValueDecoder codec = codecMap.get(bestHBCol);
@@ -716,7 +712,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
- if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+ if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
return;
}
@@ -754,33 +750,11 @@ public class CubeStorageQuery implements ICachableStorageQuery {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
- private void checkAndRewriteTopN(SQLDigest sqlDigest) {
- FunctionDesc topnFunc = null;
- TblColRef topnLiteralCol = null;
+ private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
for (MeasureDesc measure : cubeDesc.getMeasures()) {
- FunctionDesc func = measure.getFunction();
- if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) {
- topnFunc = func;
- topnLiteralCol = func.getTopNLiteralColumn();
- }
+ MeasureType<?> measureType = measure.getFunction().getMeasureType();
+ measureType.beforeStorageQuery(measure, sqlDigest);
}
-
- // if TopN is not involved
- if (topnFunc == null)
- return;
-
- if (sqlDigest.aggregations.size() != 1) {
- throw new IllegalStateException("When query with topN, only one metrics is allowed.");
- }
-
- FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
- if (origFunc.isSum() == false) {
- throw new IllegalStateException("When query with topN, only SUM function is allowed.");
- }
-
- sqlDigest.aggregations = Lists.newArrayList(topnFunc);
- sqlDigest.groupbyColumns.remove(topnLiteralCol);
- sqlDigest.metricColumns.add(topnLiteralCol);
- logger.info("Rewrite function " + origFunc + " to " + topnFunc);
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index a669e1b..e414e08 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,18 +2,12 @@ package org.apache.kylin.storage.hbase.cube.v1;
import java.io.IOException;
import java.util.BitSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -40,19 +34,14 @@ public class CubeTupleConverter {
final int[] dimensionTupleIdx;
final int[][] metricsMeasureIdx;
final int[][] metricsTupleIdx;
- final TblColRef topNCol;
- int topNColTupleIdx;
- int topNMeasureTupleIdx;
- Dictionary<String> topNColDict;
- public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
+ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = tupleInfo;
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
this.rowValueDecoders = rowValueDecoders;
this.derivedColFillers = Lists.newArrayList();
- this.topNCol = topNCol;
List<TblColRef> dimCols = cuboid.getColumns();
@@ -92,13 +81,6 @@ public class CubeTupleConverter {
}
}
- if (this.topNCol != null) {
- this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
- this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
-
- this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
- }
-
// prepare derived columns and filler
Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(dimCols, null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -112,46 +94,6 @@ public class CubeTupleConverter {
}
}
- public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
- translateResult(hbaseRow, tuple);
- Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
- assert (topNCounterObj instanceof TopNCounter);
- return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
- }
-
- private class TopNCounterTupleIterator implements Iterator {
-
- private Tuple tuple;
- private Iterator<Counter> topNCounterIterator;
- private Counter<ByteArray> counter;
-
- private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
- this.tuple = tuple;
- this.topNCounterIterator = topNCounter.iterator();
- }
-
- @Override
- public boolean hasNext() {
- return topNCounterIterator.hasNext();
- }
-
- @Override
- public Tuple next() {
- counter = topNCounterIterator.next();
- int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
- String colValue = topNColDict.getValueFromId(key);
- tuple.setDimensionValue(topNColTupleIdx, colValue);
- tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
-
- return tuple;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
public void translateResult(Result hbaseRow, Tuple tuple) {
try {
byte[] rowkey = hbaseRow.getRow();
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index 3f92cb0..e8dd5b9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -70,7 +69,8 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
- this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo));
+ CubeSegmentTupleIterator it = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ this.segmentIteratorList.add(it);
}
this.segmentIteratorIterator = this.segmentIteratorList.iterator();
@@ -81,16 +81,6 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
}
}
- private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) {
- MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders);
- if (topN != null) {
- TblColRef topNCol = topN.getFunction().getTopNLiteralColumn();
- return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
- } else {
- return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
- }
- }
-
private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) {
Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap();
for (HBaseKeyRange range : segmentKeyRanges) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
index 521f111..5b16b04 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
@@ -32,6 +32,9 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
@@ -122,6 +125,8 @@ public class ObserverAggregators {
final int nHCols;
final ByteBuffer[] hColValues;
final int nTotalMeasures;
+
+ MeasureType measureTypes[];
public ObserverAggregators(HCol[] _hcols) {
this.hcols = sort(_hcols);
@@ -150,11 +155,18 @@ public class ObserverAggregators {
}
public MeasureAggregator[] createBuffer() {
+ if (measureTypes == null) {
+ measureTypes = new MeasureType[nTotalMeasures];
+ int i = 0;
+ for (HCol col : hcols) {
+ for (int j = 0; j < col.nMeasures; j++)
+ measureTypes[i++] = MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j]));
+ }
+ }
+
MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures];
- int i = 0;
- for (HCol col : hcols) {
- for (int j = 0; j < col.nMeasures; j++)
- aggrs[i++] = MeasureAggregator.create(col.funcNames[j], col.dataTypes[j]);
+ for (int i = 0; i < nTotalMeasures; i++) {
+ aggrs[i] = measureTypes[i].newAggregator();
}
return aggrs;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
index db05a40..ea33f9a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
@@ -80,7 +80,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
for (int i = 0; i < fuzzyKeysData.size(); i++) {
p = fuzzyKeysData.get(i);
if (p.getFirst().length != p.getSecond().length) {
- Pair<String, String> readable = new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond()));
+ Pair<String, String> readable = Pair.newPair(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond()));
throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
}
// update mask ( 0 -> -1 (0xff), 1 -> 0)
@@ -225,7 +225,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
byte[] nextRowKeyCandidate = getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
if (nextRowKeyCandidate != null) {
- nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+ nextRows.add(Pair.newPair(nextRowKeyCandidate, fuzzyData));
}
}
@@ -263,7 +263,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i);
byte[] keyBytes = current.getFirst().toByteArray();
byte[] keyMeta = current.getSecond().toByteArray();
- fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+ fuzzyKeysData.add(Pair.newPair(keyBytes, keyMeta));
}
return new FuzzyRowFilterV2(fuzzyKeysData);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 412e7602..6a6c887 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -134,7 +134,7 @@ public abstract class CubeHBaseRPC {
fuzzyKeyEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyKey);
fuzzyMaskEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyMask);
- ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
+ ret.add(Pair.newPair(hbaseFuzzyKey, hbaseFuzzyMask));
}
return ret;
@@ -153,7 +153,7 @@ public abstract class CubeHBaseRPC {
for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
if (selectedColBlocks.get(colBlkIndex)) {
byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier());
- result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier));
+ result.add(Pair.newPair(byteFamily, byteQualifier));
}
colBlkIndex++;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 53b690c..592451f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -52,7 +52,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- // check whether this is a TopN query
+ // allow custom measures hack
notifyBeforeStorageQuery(sqlDigest);
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
@@ -86,10 +86,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(isExactAggregation);
- if (isExactAggregation) {
- metrics = replaceHolisticCountDistinct(metrics);
- }
-
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
@@ -111,22 +107,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
if (scanners.isEmpty())
return ITupleIterator.EMPTY_TUPLE_ITERATOR;
- return newSequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
- }
-
- private ITupleIterator newSequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics, TupleInfo returnTupleInfo, StorageContext context) {
- TblColRef topNCol = null;
- for (FunctionDesc func : metrics) {
- if (func.isTopN()) {
- topNCol = func.getTopNLiteralColumn();
- break;
- }
- }
-
- if (topNCol != null)
- return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context);
- else
- return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+ return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
}
private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -248,27 +229,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
return exact;
}
- private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
- // for count distinct, try use its holistic version if possible
- Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
- for (FunctionDesc metric : metrics) {
- if (metric.isCountDistinct() == false) {
- result.add(metric);
- continue;
- }
-
- FunctionDesc holisticVersion = null;
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- FunctionDesc measureFunc = measure.getFunction();
- if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
- holisticVersion = measureFunc;
- }
- }
- result.add(holisticVersion == null ? metric : holisticVersion);
- }
- return result;
- }
-
@SuppressWarnings("unchecked")
private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
@@ -343,15 +303,13 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
- boolean hasMemHungryCountDistinct = false;
+ boolean hasMemHungryMeasure = false;
for (FunctionDesc func : metrics) {
- if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
- hasMemHungryCountDistinct = true;
- }
+ hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
}
- // need to limit the memory usage for memory hungry count distinct
- if (hasMemHungryCountDistinct == false) {
+ // need to limit the memory usage for memory hungry measures
+ if (hasMemHungryMeasure == false) {
return;
}
@@ -380,6 +338,13 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
}
+ private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ MeasureType<?> measureType = measure.getFunction().getMeasureType();
+ measureType.beforeStorageQuery(measure, sqlDigest);
+ }
+ }
+
// ============================================================================
@Override
@@ -397,10 +362,4 @@ public class CubeStorageQuery implements ICachableStorageQuery {
return false;
}
- private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- MeasureType measureType = measure.getFunction().getMeasureType();
- measureType.beforeStorageQuery(measure, sqlDigest);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
index 3653af1..a7346af 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
@@ -18,17 +18,12 @@
package org.apache.kylin.storage.hbase.cube.v2;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -60,24 +55,19 @@ public class CubeTupleConverter {
final int[] gtColIdx;
final int[] tupleIdx;
final Object[] gtValues;
- final MeasureType[] measureTypes;
+ final MeasureType<?>[] measureTypes;
final List<IAdvMeasureFiller> advMeasureFillers;
final List<Integer> advMeasureIndexInGTValues;
final int nSelectedDims;
- final TblColRef topNCol;
- int topNColTupleIdx;
- int topNMeasureTupleIdx;
- Dictionary<String> topNColDict;
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, TblColRef topNCol) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = returnTupleInfo;
this.derivedColFillers = Lists.newArrayList();
- this.topNCol = topNCol;
List<TblColRef> cuboidDims = cuboid.getColumns();
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
@@ -117,7 +107,7 @@ public class CubeTupleConverter {
tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
}
- MeasureType measureType = metric.getMeasureType();
+ MeasureType<?> measureType = metric.getMeasureType();
if (measureType.needAdvancedTupleFilling()) {
Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
@@ -129,14 +119,6 @@ public class CubeTupleConverter {
iii++;
}
- if (this.topNCol != null) {
- this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
- // topN only allow 1 measure
- this.topNMeasureTupleIdx = tupleIdx[tupleIdx.length - 1];
-
- this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
- }
-
// prepare derived columns and filler
Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -196,13 +178,6 @@ public class CubeTupleConverter {
}
}
- public Iterator<Tuple> translateTopNResult(GTRecord record, Tuple tuple) {
- translateResult(record, tuple);
- Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
- assert (topNCounterObj instanceof TopNCounter);
- return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
- }
-
private interface IDerivedColumnFiller {
public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
}
@@ -292,38 +267,4 @@ public class CubeTupleConverter {
private static String toString(Object o) {
return o == null ? null : o.toString();
}
-
- private class TopNCounterTupleIterator implements Iterator {
-
- private Tuple tuple;
- private Iterator<Counter> topNCounterIterator;
- private Counter<ByteArray> counter;
-
- private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
- this.tuple = tuple;
- this.topNCounterIterator = topNCounter.iterator();
- }
-
- @Override
- public boolean hasNext() {
- return topNCounterIterator.hasNext();
- }
-
- @Override
- public Tuple next() {
- counter = topNCounterIterator.next();
- int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
- String colValue = topNColDict.getValueFromId(key);
- tuple.setDimensionValue(topNColTupleIdx, colValue);
- tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
-
- return tuple;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 303c360..ff7498b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -26,7 +26,6 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
deleted file mode 100644
index 9eeae4e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Created by shaoshi on 10/28/15.
- */
-public class SequentialCubeTopNTupleIterator extends SequentialCubeTupleIterator {
-
- private Iterator<Tuple> innerResultIterator;
- private TblColRef topNCol;
-
- public SequentialCubeTopNTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
- TblColRef topNCol, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
-
- super(scanners, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context);
- this.topNCol = topNCol;
- }
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
- if (innerResultIterator == null) {
- if (curScanner == null) {
- if (scannerIterator.hasNext()) {
- curScanner = scannerIterator.next();
- curRecordIterator = curScanner.iterator();
- curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, topNCol);
- } else {
- return false;
- }
- }
-
- if (curRecordIterator.hasNext()) {
- innerResultIterator = curTupleConverter.translateTopNResult(curRecordIterator.next(), tuple);
- return hasNext();
- } else {
- close(curScanner);
- curScanner = null;
- curRecordIterator = null;
- curTupleConverter = null;
- innerResultIterator = null;
- return hasNext();
- }
-
- }
-
-
- if (innerResultIterator.hasNext()) {
- next = innerResultIterator.next();
- return true;
- } else {
- innerResultIterator = null;
- return hasNext();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 5bf5e95..7812baf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -75,7 +75,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
if (scannerIterator.hasNext()) {
curScanner = scannerIterator.next();
curRecordIterator = curScanner.iterator();
- curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, null);
+ curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
index 036d9b8..10e80ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
@@ -30,6 +30,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -78,7 +79,7 @@ public class EndpointAggregators {
} else {
int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
- if (functionDesc.isCountDistinct()) {
+ if (HLLCMeasureType.isCountDistinct(functionDesc)) {
return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
} else {
return new MetricInfo(MetricType.Normal, index);
@@ -144,10 +145,10 @@ public class EndpointAggregators {
MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
for (int i = 0; i < aggrs.length; i++) {
if (metricInfos[i].type == MetricType.DistinctCount) {
- aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+ aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i]));
} else {
//all other fixed length measures can be aggregated as long
- aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+ aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long"));
}
}
return aggrs;
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index e638386..4ec421b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -42,6 +42,7 @@ import org.apache.kylin.common.util.RangeUtil;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -205,7 +206,7 @@ public class EndpointTupleIterator implements ITupleIterator {
boolean updated = false;
for (TblColRef column : columns) {
if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
- if (functionDesc.isCountDistinct()) {
+ if (HLLCMeasureType.isCountDistinct(functionDesc)) {
//TODO: default precision might need be configurable
String iiDefaultHLLC = "hllc10";
functionDesc.setReturnType(iiDefaultHLLC);
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index a7d0776..c798289 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -374,12 +374,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
int space = 0;
for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
DataType returnType = measureDesc.getFunction().getReturnDataType();
- if (returnType.isHLLC()) {
- // for HLL, it will be compressed when export to bytes
- space += returnType.getStorageBytesEstimate() * 0.75;
- } else {
- space += returnType.getStorageBytesEstimate();
- }
+ space += returnType.getStorageBytesEstimate();
}
bytesLength += space;
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 205c0db..e565e86 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -152,7 +152,7 @@ public class HBaseMROutput2 implements IMROutput2 {
this.parsedKey = new ByteArrayWritable();
this.parsedValue = new Object[measuresDescs.size()];
- this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ this.parsedPair = Pair.newPair(parsedKey, parsedValue);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index f190186..dff1e51 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -241,7 +241,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
if (parsedPair == null) {
parsedKey = new ByteArrayWritable();
parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
- parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ parsedPair = Pair.newPair(parsedKey, parsedValue);
}
// merge by cuboid files
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 617af76..59a5fed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -122,34 +122,21 @@ public class RowValueDecoder implements Cloneable {
}
}
- public boolean hasMemHungryCountDistinct() {
+ public boolean hasMemHungryMeasures() {
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
FunctionDesc func = measures[i].getFunction();
- if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+ if (func.getMeasureType().isMemoryHungry())
return true;
- }
}
return false;
}
- public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+ public static boolean hasMemHungryMeasures(Collection<RowValueDecoder> rowValueDecoders) {
for (RowValueDecoder decoder : rowValueDecoders) {
- if (decoder.hasMemHungryCountDistinct())
+ if (decoder.hasMemHungryMeasures())
return true;
}
return false;
}
- public static MeasureDesc findTopN(Collection<RowValueDecoder> rowValueDecoders) {
- for (RowValueDecoder decoder : rowValueDecoders) {
- for (int i = decoder.projectionIndex.nextSetBit(0); i >= 0; i = decoder.projectionIndex.nextSetBit(i + 1)) {
- MeasureDesc measure = decoder.measures[i];
- FunctionDesc func = measure.getFunction();
- if (func.isTopN())
- return measure;
- }
- }
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index e71089a..8a88b6d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -325,7 +325,7 @@ public class GridTableHBaseBenchmark {
int logicRows = nRows / nColumns;
for (int i = 0; i < nColumns; i++) {
if (rand.nextDouble() < hitRatio) {
- hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows));
+ hitsForColumnScan.add(Pair.newPair(i * logicRows, (i + 1) * logicRows));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
index f717d82..1d85922 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
@@ -167,7 +167,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
}
buf.putInt(i);
- Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
+ Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask);
list.add(pair);
}
@@ -200,7 +200,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
}
buf.putInt(i * 2);
- Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
+ Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask);
list.add(pair);
}
@@ -309,8 +309,8 @@ public class TestFuzzyRowFilterV2EndToEnd {
byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 };
- Pair<byte[], byte[]> pair1 = new Pair<byte[], byte[]>(fuzzyKey1, mask1);
- Pair<byte[], byte[]> pair2 = new Pair<byte[], byte[]>(fuzzyKey2, mask2);
+ Pair<byte[], byte[]> pair1 = Pair.newPair(fuzzyKey1, mask1);
+ Pair<byte[], byte[]> pair2 = Pair.newPair(fuzzyKey2, mask2);
FuzzyRowFilterV2 fuzzyRowFilter1 = new FuzzyRowFilterV2(Lists.newArrayList(pair1));
FuzzyRowFilterV2 fuzzyRowFilter2 = new FuzzyRowFilterV2(Lists.newArrayList(pair2));