You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/07 08:38:52 UTC
[50/50] [abbrv] kylin git commit: KYLIN-2606 Only return counter for
precise count_distinct if query is exactAggregate
KYLIN-2606 Only return counter for precise count_distinct if query is exactAggregate
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f40225d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f40225d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f40225d
Branch: refs/heads/KYLIN-2606
Commit: 1f40225d1a80ece1babf107b3ef5cc7de8baacb7
Parents: d509542
Author: kangkaisen <ka...@163.com>
Authored: Wed Feb 15 19:53:17 2017 +0800
Committer: 康凯森 <ka...@meituan.com>
Committed: Fri Jul 7 13:47:12 2017 +0800
----------------------------------------------------------------------
.../kylin/cube/gridtable/CubeCodeSystem.java | 4 +
.../org/apache/kylin/gridtable/GTRecord.java | 8 +
.../kylin/gridtable/GTSampleCodeSystem.java | 4 +
.../apache/kylin/gridtable/IGTCodeSystem.java | 3 +
.../measure/bitmap/BitmapCounterFactory.java | 2 +
.../kylin/measure/bitmap/BitmapSerializer.java | 37 ++++-
.../measure/bitmap/RoaringBitmapCounter.java | 10 ++
.../bitmap/RoaringBitmapCounterFactory.java | 5 +
.../metadata/datatype/DataTypeSerializer.java | 13 ++
.../gtrecord/GTCubeStorageQueryBase.java | 53 ++++++-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 1 +
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +-
.../hbase/cube/v2/HBaseReadonlyStore.java | 33 +++-
.../coprocessor/endpoint/CubeVisitService.java | 2 +-
.../endpoint/generated/CubeVisitProtos.java | 151 +++++++++++++++----
.../endpoint/protobuf/CubeVisit.proto | 1 +
16 files changed, 291 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index aaa12a7..9eae6f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -177,4 +177,8 @@ public class CubeCodeSystem implements IGTCodeSystem {
return result;
}
+ @Override
+ public DataTypeSerializer<?> getSerializer(int col) {
+ return serializers[col];
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 3e62ea7..f65e4b5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -296,4 +296,12 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
}
}
+ /** change pointers to point to data in given buffer, this
+ * method allows to defined specific column to load */
+ public void loadColumns(int selectedCol, ByteBuffer buf) {
+ int pos = buf.position();
+ int len = info.codeSystem.codeLength(selectedCol, buf);
+ cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index 3f3c844..2a5e791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -118,4 +118,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
}
};
+ @Override
+ public DataTypeSerializer<?> getSerializer(int col) {
+ return serializers[col];
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
index 89dfc99..9c8ad6b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
public interface IGTCodeSystem {
@@ -62,4 +63,6 @@ public interface IGTCodeSystem {
/** Return aggregators for metrics */
MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
+ /** Return specific DataTypeSerializer */
+ DataTypeSerializer<?> getSerializer(int col);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
index da7748e..39aa1be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
@@ -26,5 +26,7 @@ public interface BitmapCounterFactory {
BitmapCounter newBitmap(int... values);
+ BitmapCounter newBitmap(long counter);
+
BitmapCounter newBitmap(ByteBuffer in) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index c1b260d..e2ec4cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -28,6 +28,9 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
private static final BitmapCounter DELEGATE = factory.newBitmap();
+ private static final int IS_RESULT_FLAG = 1;
+ private static final int RESULT_SIZE = 12;
+
// called by reflection
public BitmapSerializer(DataType type) {
}
@@ -44,8 +47,12 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
@Override
public BitmapCounter deserialize(ByteBuffer in) {
try {
- return factory.newBitmap(in);
-
+ if (peekLength(in) == RESULT_SIZE) {
+ int flag = in.getInt();
+ return factory.newBitmap(in.getLong());
+ } else {
+ return factory.newBitmap(in);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -53,7 +60,12 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
@Override
public int peekLength(ByteBuffer in) {
- return DELEGATE.peekLength(in);
+ ByteBuffer buffer = in.slice();
+ if (buffer.getInt(0) == IS_RESULT_FLAG) {
+ return RESULT_SIZE;
+ } else {
+ return DELEGATE.peekLength(in);
+ }
}
@Override
@@ -71,4 +83,23 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
// It's difficult to decide the size before data was ingested, comparing with HLLCounter(16) as 64KB, here is assumption
return 8 * 1024;
}
+
+ @Override
+ public boolean supportDirectReturnResult() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer getFinalResult(ByteBuffer in) {
+ ByteBuffer out = ByteBuffer.allocate(RESULT_SIZE);
+ try {
+ BitmapCounter counter = factory.newBitmap(in);
+ out.putInt(IS_RESULT_FLAG);
+ out.putLong(counter.getCount());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ out.flip();
+ return out;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
index eec45f2..9929e24 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
@@ -35,6 +35,7 @@ import java.util.Iterator;
public class RoaringBitmapCounter implements BitmapCounter, Serializable {
private ImmutableRoaringBitmap bitmap;
+ private Long counter;
RoaringBitmapCounter() {
bitmap = new MutableRoaringBitmap();
@@ -44,6 +45,11 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable {
this.bitmap = bitmap;
}
+ RoaringBitmapCounter(long counter) {
+ this.counter = counter;
+ }
+
+
private MutableRoaringBitmap getMutableBitmap() {
if (bitmap instanceof MutableRoaringBitmap) {
return (MutableRoaringBitmap) bitmap;
@@ -86,6 +92,10 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable {
@Override
public long getCount() {
+ if (counter != null) {
+ return counter;
+ }
+
return bitmap.getCardinality();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
index 822afa2..8ab908a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
@@ -40,6 +40,11 @@ public class RoaringBitmapCounterFactory implements BitmapCounterFactory, Serial
}
@Override
+ public BitmapCounter newBitmap(long counter) {
+ return new RoaringBitmapCounter(counter);
+ }
+
+ @Override
public BitmapCounter newBitmap(ByteBuffer in) throws IOException {
RoaringBitmapCounter counter = new RoaringBitmapCounter();
counter.readFields(in);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index a4a35a4..2de38c0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -89,6 +89,19 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.
throw new UnsupportedOperationException();
}
+ /** If the query is exactAggregation and has some memory hungry measures,
+ * we could directly return final result to speed up the query.
+ * If the DataTypeSerializer support this,
+ * which should override the getFinalResult method, besides that, the deserialize and peekLength method should also support it, like {@link org.apache.kylin.measure.bitmap.BitmapSerializer} */
+ public boolean supportDirectReturnResult() {
+ return false;
+ }
+
+ /** An optional method that converts a expensive buffer to lightweight buffer containing final result (for memory hungry measures) */
+ public ByteBuffer getFinalResult(ByteBuffer in) {
+ throw new UnsupportedOperationException();
+ }
+
/** Convert from obj to string */
public String toString(T value) {
if (value == null)
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index f89fc47..a21446a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -44,6 +44,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TblColRef;
@@ -135,6 +136,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
+ // exactAggregation mean: needn't aggregation at storage and query engine both.
+ boolean exactAggregation = isExactAggregation(context, cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation, sqlDigest.aggregations);
+ context.setExactAggregation(exactAggregation);
+
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
Set<TblColRef> loosenedColumnD = Sets.newHashSet();
Set<TblColRef> filterColumnD = Sets.newHashSet();
@@ -428,7 +433,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
sublist.get(0).getFunction().getMeasureType().adjustSqlDigest(sublist, sqlDigest);
}
}
-
+
private TupleFilter checkHavingCanPushDown(TupleFilter havingFilter, Set<TblColRef> groupsD, List<FunctionDesc> aggregations, Set<FunctionDesc> metrics) {
// must have only one segment
Segments<CubeSegment> readySegs = cubeInstance.getSegments(SegmentStatusEnum.READY);
@@ -465,4 +470,50 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return havingFilter;
}
+ private boolean isExactAggregation(StorageContext context, Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation, Collection<FunctionDesc> functionDescs) {
+ if (context.isNeedStorageAggregation()) {
+ logger.info("exactAggregation is false because need storage aggregation");
+ return false;
+ }
+
+ if (cuboid.requirePostAggregation()) {
+ logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+ return false;
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (groups.containsAll(derivedPostAggregation) == false) {
+ logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+ return false;
+ }
+
+ // other columns (from filter) is bad, unless they are ensured to have single value
+ if (singleValuesD.containsAll(othersD) == false) {
+ logger.info("exactAggregation is false because some column not on group by: " + othersD //
+ + " (single value column: " + singleValuesD + ")");
+ return false;
+ }
+
+ //for DimensionAsMetric like max(cal_dt), the dimension column maybe not in real group by
+ for (FunctionDesc functionDesc : functionDescs) {
+ if (functionDesc.isDimensionAsMetric()) {
+ logger.info("exactAggregation is false because has DimensionAsMetric");
+ return false;
+ }
+ }
+
+ // for partitioned cube, the partition column must belong to group by or has single value
+ PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+ if (partDesc.isPartitioned()) {
+ TblColRef col = partDesc.getPartitionDateColumnRef();
+ if (!groups.contains(col) && !singleValuesD.contains(col)) {
+ logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+ return false;
+ }
+ }
+
+ logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index e822ada..af8754d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -163,6 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
+ builder.setIsExactAggregate(storageContext.isExactAggregation());
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
executorService.submit(new Runnable() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 951e2ef..a8f4fd8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -193,7 +193,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false, storageContext.isExactAggregation());
IGTScanner rawScanner = store.scan(scanRequest);
final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 631e8e8..4ec0c9d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -44,18 +45,19 @@ public class HBaseReadonlyStore implements IGTStore {
private List<List<Integer>> hbaseColumnsToGT;
private int rowkeyPreambleSize;
private boolean withDelay = false;
-
+ private boolean isExactAggregation;
/**
* @param withDelay is for test use
*/
- public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
+ public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay, boolean isExactAggregation) {
this.cellListIterator = cellListIterator;
this.info = gtScanRequest.getInfo();
this.hbaseColumns = hbaseColumns;
this.hbaseColumnsToGT = hbaseColumnsToGT;
this.rowkeyPreambleSize = rowkeyPreambleSize;
this.withDelay = withDelay;
+ this.isExactAggregation = isExactAggregation;
}
@Override
@@ -132,6 +134,12 @@ public class HBaseReadonlyStore implements IGTStore {
buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf);
}
+
+
+ if (isExactAggregation && getDirectReturnResultColumns().size() > 0) {
+ trimGTRecord(oneRecord);
+ }
+
return oneRecord;
}
@@ -145,6 +153,27 @@ public class HBaseReadonlyStore implements IGTStore {
return ByteBuffer.wrap(array, offset, length);
}
+ private List<Integer> getDirectReturnResultColumns() {
+ List<Integer> columns = new ArrayList<>();
+ for (int i = 0; i < info.getColumnCount(); i++) {
+ if (info.getCodeSystem().getSerializer(i).supportDirectReturnResult()) {
+ columns.add(i);
+ }
+ }
+ return columns;
+ }
+
+ private void trimGTRecord(GTRecord record) {
+ List<Integer> directReturnResultColumns = getDirectReturnResultColumns();
+ for (Integer i : directReturnResultColumns) {
+ ByteBuffer recordBuffer = record.get(i).asBuffer();
+ if (recordBuffer!= null) {
+ ByteBuffer trimmedBuffer = info.getCodeSystem().getSerializer(i).getFinalResult(recordBuffer);
+ record.loadColumns(i, trimmedBuffer);
+ }
+ }
+ }
+
};
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3b26b71..bb6555c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -295,7 +295,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
!request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
scanReq.getTimeout());
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), request.getIsExactAggregate());
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled());
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index b9f2771..4c662c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -123,7 +123,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
boolean hasMaxScanBytes();
@@ -131,10 +131,20 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
long getMaxScanBytes();
+
+ // optional bool isExactAggregate = 9 [default = false];
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ boolean hasIsExactAggregate();
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ boolean getIsExactAggregate();
}
/**
* Protobuf type {@code CubeVisitRequest}
@@ -230,6 +240,11 @@ public final class CubeVisitProtos {
maxScanBytes_ = input.readInt64();
break;
}
+ case 72: {
+ bitField0_ |= 0x00000080;
+ isExactAggregate_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -982,7 +997,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public boolean hasMaxScanBytes() {
@@ -992,13 +1007,29 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public long getMaxScanBytes() {
return maxScanBytes_;
}
+ // optional bool isExactAggregate = 9 [default = false];
+ public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9;
+ private boolean isExactAggregate_;
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public boolean hasIsExactAggregate() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public boolean getIsExactAggregate() {
+ return isExactAggregate_;
+ }
+
private void initFields() {
gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
@@ -1008,6 +1039,7 @@ public final class CubeVisitProtos {
queryId_ = "";
spillEnabled_ = true;
maxScanBytes_ = 0L;
+ isExactAggregate_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1061,6 +1093,9 @@ public final class CubeVisitProtos {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeInt64(8, maxScanBytes_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeBool(9, isExactAggregate_);
+ }
getUnknownFields().writeTo(output);
}
@@ -1102,6 +1137,10 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(8, maxScanBytes_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(9, isExactAggregate_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1162,6 +1201,11 @@ public final class CubeVisitProtos {
result = result && (getMaxScanBytes()
== other.getMaxScanBytes());
}
+ result = result && (hasIsExactAggregate() == other.hasIsExactAggregate());
+ if (hasIsExactAggregate()) {
+ result = result && (getIsExactAggregate()
+ == other.getIsExactAggregate());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1207,6 +1251,10 @@ public final class CubeVisitProtos {
hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getMaxScanBytes());
}
+ if (hasIsExactAggregate()) {
+ hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getIsExactAggregate());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1337,6 +1385,8 @@ public final class CubeVisitProtos {
bitField0_ = (bitField0_ & ~0x00000040);
maxScanBytes_ = 0L;
bitField0_ = (bitField0_ & ~0x00000080);
+ isExactAggregate_ = false;
+ bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
@@ -1402,6 +1452,10 @@ public final class CubeVisitProtos {
to_bitField0_ |= 0x00000040;
}
result.maxScanBytes_ = maxScanBytes_;
+ if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.isExactAggregate_ = isExactAggregate_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1469,6 +1523,9 @@ public final class CubeVisitProtos {
if (other.hasMaxScanBytes()) {
setMaxScanBytes(other.getMaxScanBytes());
}
+ if (other.hasIsExactAggregate()) {
+ setIsExactAggregate(other.getIsExactAggregate());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -2068,7 +2125,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public boolean hasMaxScanBytes() {
@@ -2078,7 +2135,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public long getMaxScanBytes() {
@@ -2088,7 +2145,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public Builder setMaxScanBytes(long value) {
@@ -2101,7 +2158,7 @@ public final class CubeVisitProtos {
* <code>optional int64 maxScanBytes = 8;</code>
*
* <pre>
- * 0 means no limit
+ * must be positive
* </pre>
*/
public Builder clearMaxScanBytes() {
@@ -2111,6 +2168,39 @@ public final class CubeVisitProtos {
return this;
}
+ // optional bool isExactAggregate = 9 [default = false];
+ private boolean isExactAggregate_ ;
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public boolean hasIsExactAggregate() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public boolean getIsExactAggregate() {
+ return isExactAggregate_;
+ }
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public Builder setIsExactAggregate(boolean value) {
+ bitField0_ |= 0x00000100;
+ isExactAggregate_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool isExactAggregate = 9 [default = false];</code>
+ */
+ public Builder clearIsExactAggregate() {
+ bitField0_ = (bitField0_ & ~0x00000100);
+ isExactAggregate_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CubeVisitRequest)
}
@@ -5516,32 +5606,33 @@ public final class CubeVisitProtos {
java.lang.String[] descriptorData = {
"\npstorage-hbase/src/main/java/org/apache" +
"/kylin/storage/hbase/cube/v2/coprocessor" +
- "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" +
+ "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" +
"ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" +
"\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" +
"ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" +
"eVisitRequest.IntList\022\027\n\017kylinProperties" +
"\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" +
- "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\032\027\n\007Int" +
- "List\022\014\n\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse",
- "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132" +
- "\030.CubeVisitResponse.Stats\022/\n\terrorInfo\030\003" +
- " \001(\0132\034.CubeVisitResponse.ErrorInfo\032\220\002\n\005S" +
- "tats\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016servic" +
- "eEndTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022" +
- "\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\rsystemCpu" +
- "Load\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001" +
- "(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostnam" +
- "e\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplet" +
- "e\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorIn",
- "fo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse.Err" +
- "orType\022\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014" +
- "UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_" +
- "LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\0222\n\t" +
- "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
- "ResponseB`\nEorg.apache.kylin.storage.hba" +
- "se.cube.v2.coprocessor.endpoint.generate" +
- "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+ "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\037\n\020isE" +
+ "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n",
+ "\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" +
+ "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeV" +
+ "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." +
+ "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" +
+ "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+ "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggr" +
+ "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " +
+ "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" +
+ "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" +
+ "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005",
+ "\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorInfo\022*\n\004t" +
+ "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" +
+ "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" +
+ "_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" +
+ "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" +
+ "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" +
+ "eB`\nEorg.apache.kylin.storage.hbase.cube" +
+ ".v2.coprocessor.endpoint.generatedB\017Cube" +
+ "VisitProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5553,7 +5644,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitRequest_descriptor,
- new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", });
+ new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", "IsExactAggregate", });
internal_static_CubeVisitRequest_IntList_descriptor =
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index aa83595..8ca8756 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -38,6 +38,7 @@ message CubeVisitRequest {
optional string queryId = 6;
optional bool spillEnabled = 7 [default = true];
optional int64 maxScanBytes = 8; // must be positive
+ optional bool isExactAggregate = 9 [default = false];
message IntList {
repeated int32 ints = 1;
}