You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:08 UTC
[kylin] 01/04: KYLIN-4408 Change kylin.query.max-return-rows to
kylin.query.max-return-bytes
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5c69a12037d2b8c99cbffe7541bfbc721c44c0b3
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Tue Mar 10 11:39:49 2020 +0800
KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes
---
.../org/apache/kylin/common/KylinConfigBase.java | 6 +-
.../java/org/apache/kylin/common/QueryContext.java | 26 +++++---
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 75 +++++++++++-----------
3 files changed, 58 insertions(+), 49 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8eefc02..e2d7543 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1730,8 +1730,10 @@ public abstract class KylinConfigBase implements Serializable {
return value > 0 ? value : Long.MAX_VALUE;
}
- public long getQueryMaxReturnRows() {
- return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
+ public long getQueryMaxReturnBytes() {
+ long value = Long
+ .parseLong(this.getOptional("kylin.query.max-return-bytes", String.valueOf(3L * 1024 * 1024 * 1024)));
+ return value > 0 ? value : Long.MAX_VALUE;
}
public int getTranslatedInClauseMaxSize() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 88caa7d..85cc5f8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -57,6 +57,7 @@ public class QueryContext {
private AtomicLong scannedRows = new AtomicLong();
private AtomicLong returnedRows = new AtomicLong();
private AtomicLong scannedBytes = new AtomicLong();
+ private AtomicLong returnedBytes = new AtomicLong();
private Object calcitePlan;
private AtomicBoolean isRunning = new AtomicBoolean(true);
@@ -142,6 +143,14 @@ public class QueryContext {
return scannedBytes.addAndGet(deltaBytes);
}
+ public long getReturnedBytes() {
+ return returnedBytes.get();
+ }
+
+ public long addAndGetReturnedBytes(long deltaBytes) {
+ return returnedBytes.addAndGet(deltaBytes);
+ }
+
public void addQueryStopListener(QueryStopListener listener) {
this.stopListeners.add(listener);
}
@@ -222,14 +231,12 @@ public class QueryContext {
}
ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
if (segmentStatisticsMap == null) {
- logger.warn(
- "cubeSegmentStatistic should be initialized for cube {}", cubeName);
+ logger.warn("cubeSegmentStatistic should be initialized for cube {}", cubeName);
return null;
}
CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
if (segmentStatistics == null) {
- logger.warn(
- "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
+ logger.warn("segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
return null;
}
return segmentStatistics;
@@ -280,16 +287,15 @@ public class QueryContext {
if (old == null) {
segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask);
} else if (segmentStatistics.sourceCuboidId != sourceCuboidId
- || segmentStatistics.targetCuboidId != targetCuboidId
- || segmentStatistics.filterMask != filterMask) {
+ || segmentStatistics.targetCuboidId != targetCuboidId || segmentStatistics.filterMask != filterMask) {
StringBuilder inconsistency = new StringBuilder();
if (segmentStatistics.sourceCuboidId != sourceCuboidId) {
- inconsistency.append(
- "sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
+ inconsistency
+ .append("sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
}
if (segmentStatistics.targetCuboidId != targetCuboidId) {
- inconsistency.append(
- "targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
+ inconsistency
+ .append("targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
}
if (segmentStatistics.filterMask != filterMask) {
inconsistency.append("filterMask exist " + segmentStatistics.filterMask + INPUT + filterMask);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9a12ed8..56538af 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -362,14 +362,26 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
- stats.getFilteredRowCount(),
stats.getAggregatedRowCount(), stats.getScannedBytes());
+ byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
+ byte[] queueData = rawData;
if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
rpcException = new ResourceLimitExceededException(
"Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
+ cubeSeg.getConfig().getQueryMaxScanBytes());
- } else if (queryContext.getReturnedRows() > cubeSeg.getConfig().getQueryMaxReturnRows()) {
- rpcException = new ResourceLimitExceededException(
- "Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold "
- + cubeSeg.getConfig().getQueryMaxReturnRows());
+ } else {
+ try {
+ if (compressionResult) {
+ queueData = CompressionUtils.decompress(rawData);
+ }
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException(logHeader + "Error when decompressing", e);
+ }
+ if (queryContext.addAndGetReturnedBytes(queueData.length) > cubeSeg.getConfig()
+ .getQueryMaxReturnBytes()) {
+ rpcException = new ResourceLimitExceededException("Query returned "
+ + queryContext.getReturnedBytes() + " bytes exceeds threshold "
+ + cubeSeg.getConfig().getQueryMaxReturnBytes());
+ }
}
if (rpcException != null) {
@@ -377,42 +389,31 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return;
}
- try {
- byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
- if (compressionResult) {
- epResultItr.append(CompressionUtils.decompress(rawData));
- } else {
- epResultItr.append(rawData);
- }
- // put segment query result to cache if cache is enabled
- if (querySegmentCacheEnabled) {
- try {
- segmentQueryResultBuilder.putRegionResult(rawData);
- if (segmentQueryResultBuilder.isComplete()) {
- CubeSegmentStatistics cubeSegmentStatistics = queryContext
- .getCubeSegmentStatistics(storageContext.ctxId,
- cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
- if (cubeSegmentStatistics != null) {
- segmentQueryResultBuilder
- .setCubeSegmentStatistics(cubeSegmentStatistics);
- logger.info(
- "Query-{}: try to put segment query result to cache for segment:{}",
- queryContext.getQueryId(), cubeSeg);
- SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder
- .build();
- SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
- segmentQueryResult);
- logger.info(
- "Query-{}: successfully put segment query result to cache for segment:{}",
- queryContext.getQueryId(), cubeSeg);
- }
+ epResultItr.append(queueData);
+ // put segment query result to cache if cache is enabled
+ if (querySegmentCacheEnabled) {
+ try {
+ segmentQueryResultBuilder.putRegionResult(rawData);
+ if (segmentQueryResultBuilder.isComplete()) {
+ CubeSegmentStatistics cubeSegmentStatistics = queryContext
+ .getCubeSegmentStatistics(storageContext.ctxId,
+ cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
+ if (cubeSegmentStatistics != null) {
+ segmentQueryResultBuilder.setCubeSegmentStatistics(cubeSegmentStatistics);
+ logger.info(
+ "Query-{}: try to put segment query result to cache for segment:{}",
+ queryContext.getQueryId(), cubeSeg);
+ SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder.build();
+ SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+ segmentQueryResult);
+ logger.info(
+ "Query-{}: successfully put segment query result to cache for segment:{}",
+ queryContext.getQueryId(), cubeSeg);
}
- } catch (Throwable t) {
- logger.error("Fail to put query segment result to cache", t);
}
+ } catch (Throwable t) {
+ logger.error("Fail to put query segment result to cache", t);
}
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException(logHeader + "Error when decompressing", e);
}
}
});