You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/10 09:03:50 UTC
[16/17] kylin git commit: KYLIN-2437 collect number of bytes scanned
to query metrics
KYLIN-2437 collect number of bytes scanned to query metrics
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e09338b3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e09338b3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e09338b3
Branch: refs/heads/KYLIN-2428
Commit: e09338b34c0b07a7167096e45bf9185aa0d0cbd5
Parents: ecf6a69
Author: gaodayue <ga...@meituan.com>
Authored: Wed Feb 8 13:59:31 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Feb 10 15:39:11 2017 +0800
----------------------------------------------------------------------
.../kylin/gridtable/GTAggregateScanner.java | 11 +-
.../apache/kylin/gridtable/GTScanRequest.java | 10 +-
.../apache/kylin/gridtable/ScannerWorker.java | 64 ---------
.../apache/kylin/storage/StorageContext.java | 9 ++
.../storage/gtrecord/CubeSegmentScanner.java | 3 +-
.../kylin/storage/gtrecord/ScannerWorker.java | 71 ++++++++++
.../apache/kylin/rest/response/SQLResponse.java | 10 ++
.../apache/kylin/rest/service/QueryService.java | 9 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 6 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 17 ++-
.../coprocessor/endpoint/CubeVisitService.java | 142 +++++++++++--------
.../endpoint/generated/CubeVisitProtos.java | 107 ++++++++++++--
.../endpoint/protobuf/CubeVisit.proto | 1 +
14 files changed, 309 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 147dbc1..dd359f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -65,7 +65,6 @@ public class GTAggregateScanner implements IGTScanner {
final AggregationCache aggrCache;
final long spillThreshold; // 0 means no memory control && no spill
final int storagePushDownLimit;//default to be Int.MAX
- final long deadline;
final boolean spillEnabled;
private int aggregatedRowCount = 0;
@@ -73,10 +72,10 @@ public class GTAggregateScanner implements IGTScanner {
private boolean[] aggrMask;
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
- this(inputScanner, req, Long.MAX_VALUE, true);
+ this(inputScanner, req, true);
}
- public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline, boolean spillEnabled) {
+ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean spillEnabled) {
if (!req.hasAggregation())
throw new IllegalStateException();
@@ -90,7 +89,6 @@ public class GTAggregateScanner implements IGTScanner {
this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
this.aggrMask = new boolean[metricsAggrFuncs.length];
this.storagePushDownLimit = req.getStoragePushDownLimit();
- this.deadline = deadline;
this.spillEnabled = spillEnabled;
Arrays.fill(aggrMask, true);
@@ -145,11 +143,6 @@ public class GTAggregateScanner implements IGTScanner {
long count = 0;
for (GTRecord r : inputScanner) {
- //check deadline
- if (count % GTScanRequest.terminateCheckInterval == 1 && System.currentTimeMillis() > deadline) {
- throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
- }
-
if (getNumOfSpills() == 0) {
//check limit
boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 7c94f5a..651e5c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -156,7 +156,7 @@ public class GTScanRequest {
}
public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
- return decorateScanner(scanner, true, true, Long.MAX_VALUE);
+ return decorateScanner(scanner, true, true);
}
/**
@@ -165,14 +165,14 @@ public class GTScanRequest {
*
* Refer to CoprocessorBehavior for explanation
*/
- public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, long deadline) throws IOException {
- return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, deadline, true);
+ public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn) throws IOException {
+ return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, true);
}
/**
* hasPreFiltered indicate the data has been filtered before scanning
*/
- public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, long deadline, boolean spillEnabled) throws IOException {
+ public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, boolean spillEnabled) throws IOException {
IGTScanner result = scanner;
if (!filterToggledOn) { //Skip reading this section if you're not profiling!
int scanned = lookAndForget(result);
@@ -194,7 +194,7 @@ public class GTScanRequest {
} else if (this.hasAggregation()) {
logger.info("pre aggregating results before returning");
this.doingStorageAggregation = true;
- result = new GTAggregateScanner(result, this, deadline, spillEnabled);
+ result = new GTAggregateScanner(result, this, spillEnabled);
} else {
logger.info("has no aggregation, skip it");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
deleted file mode 100644
index f26d993..0000000
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.gridtable;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Iterator;
-
-import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ScannerWorker {
-
- private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
- private IGTScanner internal = null;
-
- public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
- if (scanRequest == null) {
- logger.info("Segment {} will be skipped", segment);
- internal = new EmptyGTScanner(0);
- return;
- }
-
- final GTInfo info = scanRequest.getInfo();
-
- try {
- IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
- internal = rpc.getGTScanner(scanRequest);
- } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
- throw new RuntimeException(e);
- }
- }
-
- public Iterator<GTRecord> iterator() {
- return internal.iterator();
- }
-
- public void close() throws IOException {
- internal.close();
- }
-
- public long getScannedRowCount() {
- return internal.getScannedRowCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index ab0ea73..708dfde 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -50,6 +50,7 @@ public class StorageContext {
private IStorageQuery storageQuery;
private AtomicLong totalScanCount = new AtomicLong();
+ private AtomicLong totalScanBytes = new AtomicLong();
private Cuboid cuboid;
private boolean partialResultReturned = false;
@@ -161,6 +162,14 @@ public class StorageContext {
return this.totalScanCount.addAndGet(count);
}
+ public long getTotalScanBytes() {
+ return totalScanBytes.get();
+ }
+
+ public long increaseTotalScanBytes(long bytes) {
+ return totalScanBytes.addAndGet(bytes);
+ }
+
public boolean isAcceptPartialResult() {
return acceptPartialResult;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9d6f946..974b8ea 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -30,7 +30,6 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.gridtable.ScannerWorker;
import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -79,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
- scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
new file mode 100644
index 0000000..2a2a86a
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.EmptyGTScanner;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStorage;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
+public class ScannerWorker {
+
+ private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
+ private IGTScanner internal = null;
+
+ public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
+ if (scanRequest == null) {
+ logger.info("Segment {} will be skipped", segment);
+ internal = new EmptyGTScanner(0);
+ return;
+ }
+
+ final GTInfo info = scanRequest.getInfo();
+
+ try {
+ IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
+ internal = rpc.getGTScanner(scanRequest);
+ } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Iterator<GTRecord> iterator() {
+ return internal.iterator();
+ }
+
+ public void close() throws IOException {
+ internal.close();
+ }
+
+ public long getScannedRowCount() {
+ return internal.getScannedRowCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 9c4e9da..387e6c9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -55,6 +55,8 @@ public class SQLResponse implements Serializable {
protected long totalScanCount;
+ protected long totalScanBytes;
+
protected boolean hitExceptionCache = false;
protected boolean storageCacheUsed = false;
@@ -150,6 +152,14 @@ public class SQLResponse implements Serializable {
this.totalScanCount = totalScanCount;
}
+ public long getTotalScanBytes() {
+ return totalScanBytes;
+ }
+
+ public void setTotalScanBytes(long totalScanBytes) {
+ this.totalScanBytes = totalScanBytes;
+ }
+
public boolean isHitExceptionCache() {
return hitExceptionCache;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 7ce38ea..9ccda03 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -276,6 +276,7 @@ public class QueryService extends BasicService {
stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine);
stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine);
stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine);
+ stringBuilder.append("Total scan bytes: ").append(response.getTotalScanBytes()).append(newLine);
stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
@@ -580,15 +581,18 @@ public class QueryService extends BasicService {
boolean isPartialResult = false;
String cube = "";
- StringBuilder sb = new StringBuilder("Scan count for each storageContext: ");
+ StringBuilder sb = new StringBuilder("Scan stats for each storageContext: ");
long totalScanCount = 0;
+ long totalScanBytes = 0;
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
if (ctx.realization != null) {
isPartialResult |= ctx.storageContext.isPartialResultReturned();
cube = ctx.realization.getName();
totalScanCount += ctx.storageContext.getTotalScanCount();
- sb.append(ctx.storageContext.getTotalScanCount() + ",");
+ totalScanBytes += ctx.storageContext.getTotalScanBytes();
+ sb.append("{rows=").append(ctx.storageContext.getTotalScanCount()).
+ append(" bytes=").append(ctx.storageContext.getTotalScanBytes()).append("} ");
}
}
}
@@ -596,6 +600,7 @@ public class QueryService extends BasicService {
SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult);
response.setTotalScanCount(totalScanCount);
+ response.setTotalScanBytes(totalScanBytes);
return response;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index dd9f74c..a2b2611 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
- public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
private byte[] getByteArrayForShort(short v) {
@@ -198,6 +199,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
if (region == null)
return;
+ context.increaseTotalScanBytes(result.getStats().getScannedBytes());
totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
logger.info(logHeader + getStatsString(region, result));
@@ -280,6 +282,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
Stats stats = result.getStats();
sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
+ sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 05b34c7..11fbb19 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.IGTStorage;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,15 +63,18 @@ public abstract class CubeHBaseRPC implements IGTStorage {
final protected CubeSegment cubeSeg;
final protected Cuboid cuboid;
final protected GTInfo fullGTInfo;
+ final protected StorageContext context;
+
final private RowKeyEncoder fuzzyKeyEncoder;
final private RowKeyEncoder fuzzyMaskEncoder;
- public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+ public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
+ this.context = context;
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a52af90..b94346c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
@@ -41,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
}
- public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
@Override
@@ -180,12 +182,15 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
CellListIterator cellListIterator = new CellListIterator() {
+ long scanBytes = 0;
+
@Override
public void close() throws IOException {
for (ResultScanner scanner : scanners) {
scanner.close();
}
hbaseTable.close();
+ context.increaseTotalScanBytes(scanBytes);
}
@Override
@@ -195,7 +200,11 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
@Override
public List<Cell> next() {
- return allResultsIterator.next().listCells();
+ List<Cell> result = allResultsIterator.next().listCells();
+ for (Cell cell : result) {
+ scanBytes += CellUtil.estimatedSizeOf(cell);
+ }
+ return result;
}
@Override
@@ -232,4 +241,4 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3e0a065..1f6425f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -33,6 +33,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
@@ -87,7 +88,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
private long serviceStartTime;
- static class InnerScannerAsIterator implements CellListIterator {
+ abstract static class BaseCellListIterator implements CellListIterator {
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ // no op. we close all region scanners at final block.
+ }
+ }
+
+ static class InnerScannerAsIterator extends BaseCellListIterator {
private RegionScanner regionScanner;
private List<Cell> nextOne = Lists.newArrayList();
private List<Cell> ret = Lists.newArrayList();
@@ -127,15 +140,58 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
return ret;
}
+ }
+
+ // TODO move this logic to HBaseReadonlyStore once it's been refactored
+ static class ResourceTrackingCellListIterator extends BaseCellListIterator {
+ private final Iterator<List<Cell>> delegate;
+ private final long rowCountLimit;
+ private final long bytesLimit;
+ private final long timeout;
+ private final long deadline;
+
+ private long rowCount;
+ private long rowBytes;
+
+ ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate,
+ long rowCountLimit, long bytesLimit, long timeout) {
+ this.delegate = delegate;
+ this.rowCountLimit = rowCountLimit;
+ this.bytesLimit = bytesLimit;
+ this.timeout = timeout;
+ this.deadline = System.currentTimeMillis() + timeout;
+ }
@Override
- public void remove() {
- throw new UnsupportedOperationException();
+ public boolean hasNext() {
+ if (rowCount > rowCountLimit) {
+ throw new GTScanExceedThresholdException("Number of rows scanned exceeds threshold " + rowCountLimit);
+ }
+ if (rowBytes > bytesLimit) {
+ throw new GTScanExceedThresholdException("Scanned " + rowBytes + " bytes exceeds threshold " + bytesLimit);
+ }
+ if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) {
+ throw new GTScanTimeoutException("Scan timeout after " + timeout + " ms");
+ }
+ return delegate.hasNext();
}
@Override
- public void close() throws IOException {
- //does not need to close as regionScanner will be closed in finally block
+ public List<Cell> next() {
+ List<Cell> result = delegate.next();
+ rowCount++;
+ for (Cell cell : result) {
+ rowBytes += CellUtil.estimatedSizeOf(cell);
+ }
+ return result;
+ }
+
+ public long getTotalScannedRowCount() {
+ return rowCount;
+ }
+
+ public long getTotalScannedRowBytes() {
+ return rowBytes;
}
}
@@ -237,51 +293,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
- final long deadline = serviceStartTime + scanReq.getTimeout();
- logger.info("deadline(local) is " + deadline);
final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
- final CellListIterator cellListIterator = new CellListIterator() {
-
- int counter = 0;
-
- @Override
- public void close() throws IOException {
- for (CellListIterator closeable : cellListsForeachRawScan) {
- closeable.close();
- }
- }
-
- @Override
- public boolean hasNext() {
-
- counter++;
-
- if (counter > scanReq.getStorageScanRowNumThreshold()) {
- throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter + ", consider increasing kylin.query.memory-budget-bytes and kylin.query.scan-threshold");
- }
-
- if (counter % (10 * GTScanRequest.terminateCheckInterval) == 1) {
- logger.info("scanning " + counter + "th row from HBase.");
- }
- return allCellLists.hasNext();
- }
-
- @Override
- public List<Cell> next() {
- return allCellLists.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(
+ allCellLists,
+ scanReq.getStorageScanRowNumThreshold(),
+ Long.MAX_VALUE,
+ scanReq.getTimeout());
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
IGTScanner rawScanner = store.scan(scanReq);
- IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, deadline, request.getSpillEnabled());
+ IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled());
ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
@@ -290,13 +313,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
try {
for (GTRecord oneRecord : finalScanner) {
-
- if (finalRowCount % GTScanRequest.terminateCheckInterval == 1) {
- if (System.currentTimeMillis() > deadline) {
- throw new GTScanTimeoutException("finalScanner timeouts after contributed " + finalRowCount);
- }
- }
-
buffer.clear();
try {
oneRecord.exportColumns(scanReq.getColumns(), buffer);
@@ -326,6 +342,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
appendProfileInfo(sb, "agg done");
+ logger.info("Total scanned {} rows and {} bytes",
+ cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes());
//outputStream.close() is not necessary
byte[] compressedAllRows;
@@ -341,6 +359,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
appendProfileInfo(sb, "compress done");
+ logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length);
OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
@@ -353,16 +372,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies
- setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
- setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
- setScannedRowCount(finalScanner.getScannedRowCount()).//
- setServiceStartTime(serviceStartTime).//
- setServiceEndTime(System.currentTimeMillis()).//
- setSystemCpuLoad(systemCpuLoad).//
- setFreePhysicalMemorySize(freePhysicalMemorySize).//
- setFreeSwapSpaceSize(freeSwapSpaceSize).//
- setHostname(InetAddress.getLocalHost().getHostName()).//
- setEtcMsg(sb.toString()).//
+ setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().
+ setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount).
+ setScannedRowCount(cellListIterator.getTotalScannedRowCount()).
+ setScannedBytes(cellListIterator.getTotalScannedRowBytes()).
+ setServiceStartTime(serviceStartTime).
+ setServiceEndTime(System.currentTimeMillis()).
+ setSystemCpuLoad(systemCpuLoad).
+ setFreePhysicalMemorySize(freePhysicalMemorySize).
+ setFreeSwapSpaceSize(freeSwapSpaceSize).
+ setHostname(InetAddress.getLocalHost().getHostName()).
+ setEtcMsg(sb.toString()).
setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
.//
build());
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index def0182..5a3aa5a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -2255,6 +2255,16 @@ public final class CubeVisitProtos {
* </pre>
*/
int getNormalComplete();
+
+ // optional int64 scannedBytes = 11;
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ boolean hasScannedBytes();
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ long getScannedBytes();
}
/**
* Protobuf type {@code CubeVisitResponse.Stats}
@@ -2357,6 +2367,11 @@ public final class CubeVisitProtos {
normalComplete_ = input.readInt32();
break;
}
+ case 88: {
+ bitField0_ |= 0x00000400;
+ scannedBytes_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2619,6 +2634,22 @@ public final class CubeVisitProtos {
return normalComplete_;
}
+ // optional int64 scannedBytes = 11;
+ public static final int SCANNEDBYTES_FIELD_NUMBER = 11;
+ private long scannedBytes_;
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public boolean hasScannedBytes() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public long getScannedBytes() {
+ return scannedBytes_;
+ }
+
private void initFields() {
serviceStartTime_ = 0L;
serviceEndTime_ = 0L;
@@ -2630,6 +2661,7 @@ public final class CubeVisitProtos {
hostname_ = "";
etcMsg_ = "";
normalComplete_ = 0;
+ scannedBytes_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2673,6 +2705,9 @@ public final class CubeVisitProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeInt32(10, normalComplete_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeInt64(11, scannedBytes_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2722,6 +2757,10 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(10, normalComplete_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(11, scannedBytes_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -2792,6 +2831,11 @@ public final class CubeVisitProtos {
result = result && (getNormalComplete()
== other.getNormalComplete());
}
+ result = result && (hasScannedBytes() == other.hasScannedBytes());
+ if (hasScannedBytes()) {
+ result = result && (getScannedBytes()
+ == other.getScannedBytes());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -2848,6 +2892,10 @@ public final class CubeVisitProtos {
hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER;
hash = (53 * hash) + getNormalComplete();
}
+ if (hasScannedBytes()) {
+ hash = (37 * hash) + SCANNEDBYTES_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getScannedBytes());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -2977,6 +3025,8 @@ public final class CubeVisitProtos {
bitField0_ = (bitField0_ & ~0x00000100);
normalComplete_ = 0;
bitField0_ = (bitField0_ & ~0x00000200);
+ scannedBytes_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -3045,6 +3095,10 @@ public final class CubeVisitProtos {
to_bitField0_ |= 0x00000200;
}
result.normalComplete_ = normalComplete_;
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.scannedBytes_ = scannedBytes_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3095,6 +3149,9 @@ public final class CubeVisitProtos {
if (other.hasNormalComplete()) {
setNormalComplete(other.getNormalComplete());
}
+ if (other.hasScannedBytes()) {
+ setScannedBytes(other.getScannedBytes());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3550,6 +3607,39 @@ public final class CubeVisitProtos {
return this;
}
+ // optional int64 scannedBytes = 11;
+ private long scannedBytes_ ;
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public boolean hasScannedBytes() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public long getScannedBytes() {
+ return scannedBytes_;
+ }
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public Builder setScannedBytes(long value) {
+ bitField0_ |= 0x00000400;
+ scannedBytes_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 scannedBytes = 11;</code>
+ */
+ public Builder clearScannedBytes() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ scannedBytes_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
}
@@ -4349,20 +4439,21 @@ public final class CubeVisitProtos {
"ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" +
"eVisitRequest.IntList\022\027\n\017kylinProperties" +
"\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" +
- "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n" +
+ "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\347\002\n" +
"\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002",
"(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" +
- "ats\032\372\001\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" +
+ "ats\032\220\002\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" +
"\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" +
"nt\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\r" +
"systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" +
"ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" +
"\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016norm" +
- "alComplete\030\n \001(\0052F\n\020CubeVisitService\0222\n\t" +
- "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
- "ResponseB`\nEorg.apache.kylin.storage.hba",
- "se.cube.v2.coprocessor.endpoint.generate" +
- "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+ "alComplete\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\0032F" +
+ "\n\020CubeVisitService\0222\n\tvisitCube\022\021.CubeVi" +
+ "sitRequest\032\022.CubeVisitResponseB`\nEorg.ap",
+ "ache.kylin.storage.hbase.cube.v2.coproce" +
+ "ssor.endpoint.generatedB\017CubeVisitProtos" +
+ "H\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4392,7 +4483,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitResponse_Stats_descriptor,
- new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", });
+ new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", "ScannedBytes", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index c7c2954..f416669 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -54,6 +54,7 @@ message CubeVisitResponse {
optional string hostname = 8;
optional string etcMsg = 9;
optional int32 normalComplete =10;//when time outs, normalComplete will be false
+ optional int64 scannedBytes = 11;
}
required bytes compressedRows = 1;
required Stats stats = 2;