You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 05:53:08 UTC

[kylin] 01/04: KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5c69a12037d2b8c99cbffe7541bfbc721c44c0b3
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Tue Mar 10 11:39:49 2020 +0800

    KYLIN-4408 Change kylin.query.max-return-rows to kylin.query.max-return-bytes
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  6 +-
 .../java/org/apache/kylin/common/QueryContext.java | 26 +++++---
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 75 +++++++++++-----------
 3 files changed, 58 insertions(+), 49 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8eefc02..e2d7543 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1730,8 +1730,10 @@ public abstract class KylinConfigBase implements Serializable {
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
-    public long getQueryMaxReturnRows() {
-        return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
+    public long getQueryMaxReturnBytes() {
+        long value = Long
+                .parseLong(this.getOptional("kylin.query.max-return-bytes", String.valueOf(3L * 1024 * 1024 * 1024)));
+        return value > 0 ? value : Long.MAX_VALUE;
     }
 
     public int getTranslatedInClauseMaxSize() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 88caa7d..85cc5f8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -57,6 +57,7 @@ public class QueryContext {
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
     private AtomicLong scannedBytes = new AtomicLong();
+    private AtomicLong returnedBytes = new AtomicLong();
     private Object calcitePlan;
 
     private AtomicBoolean isRunning = new AtomicBoolean(true);
@@ -142,6 +143,14 @@ public class QueryContext {
         return scannedBytes.addAndGet(deltaBytes);
     }
 
+    public long getReturnedBytes() {
+        return returnedBytes.get();
+    }
+
+    public long addAndGetReturnedBytes(long deltaBytes) {
+        return returnedBytes.addAndGet(deltaBytes);
+    }
+
     public void addQueryStopListener(QueryStopListener listener) {
         this.stopListeners.add(listener);
     }
@@ -222,14 +231,12 @@ public class QueryContext {
         }
         ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
         if (segmentStatisticsMap == null) {
-            logger.warn(
-                    "cubeSegmentStatistic should be initialized for cube {}", cubeName);
+            logger.warn("cubeSegmentStatistic should be initialized for cube {}", cubeName);
             return null;
         }
         CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
         if (segmentStatistics == null) {
-            logger.warn(
-                    "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
+            logger.warn("segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
             return null;
         }
         return segmentStatistics;
@@ -280,16 +287,15 @@ public class QueryContext {
         if (old == null) {
             segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask);
         } else if (segmentStatistics.sourceCuboidId != sourceCuboidId
-                || segmentStatistics.targetCuboidId != targetCuboidId
-                || segmentStatistics.filterMask != filterMask) {
+                || segmentStatistics.targetCuboidId != targetCuboidId || segmentStatistics.filterMask != filterMask) {
             StringBuilder inconsistency = new StringBuilder();
             if (segmentStatistics.sourceCuboidId != sourceCuboidId) {
-                inconsistency.append(
-                        "sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
+                inconsistency
+                        .append("sourceCuboidId exist " + segmentStatistics.sourceCuboidId + INPUT + sourceCuboidId);
             }
             if (segmentStatistics.targetCuboidId != targetCuboidId) {
-                inconsistency.append(
-                        "targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
+                inconsistency
+                        .append("targetCuboidId exist " + segmentStatistics.targetCuboidId + INPUT + targetCuboidId);
             }
             if (segmentStatistics.filterMask != filterMask) {
                 inconsistency.append("filterMask exist " + segmentStatistics.filterMask + INPUT + filterMask);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9a12ed8..56538af 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -362,14 +362,26 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                             - stats.getFilteredRowCount(),
                                     stats.getAggregatedRowCount(), stats.getScannedBytes());
 
+                            byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
+                            byte[] queueData = rawData;
                             if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
                                 rpcException = new ResourceLimitExceededException(
                                         "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
                                                 + cubeSeg.getConfig().getQueryMaxScanBytes());
-                            } else if (queryContext.getReturnedRows() > cubeSeg.getConfig().getQueryMaxReturnRows()) {
-                                rpcException = new ResourceLimitExceededException(
-                                        "Query returned " + queryContext.getReturnedRows() + " rows exceeds threshold "
-                                                + cubeSeg.getConfig().getQueryMaxReturnRows());
+                            } else {
+                                try {
+                                    if (compressionResult) {
+                                        queueData = CompressionUtils.decompress(rawData);
+                                    }
+                                } catch (IOException | DataFormatException e) {
+                                    throw new RuntimeException(logHeader + "Error when decompressing", e);
+                                }
+                                if (queryContext.addAndGetReturnedBytes(queueData.length) > cubeSeg.getConfig()
+                                        .getQueryMaxReturnBytes()) {
+                                    rpcException = new ResourceLimitExceededException("Query returned "
+                                            + queryContext.getReturnedBytes() + " bytes exceeds threshold "
+                                            + cubeSeg.getConfig().getQueryMaxReturnBytes());
+                                }
                             }
 
                             if (rpcException != null) {
@@ -377,42 +389,31 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                 return;
                             }
 
-                            try {
-                                byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
-                                if (compressionResult) {
-                                    epResultItr.append(CompressionUtils.decompress(rawData));
-                                } else {
-                                    epResultItr.append(rawData);
-                                }
-                                // put segment query result to cache if cache is enabled
-                                if (querySegmentCacheEnabled) {
-                                    try {
-                                        segmentQueryResultBuilder.putRegionResult(rawData);
-                                        if (segmentQueryResultBuilder.isComplete()) {
-                                            CubeSegmentStatistics cubeSegmentStatistics = queryContext
-                                                    .getCubeSegmentStatistics(storageContext.ctxId,
-                                                            cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
-                                            if (cubeSegmentStatistics != null) {
-                                                segmentQueryResultBuilder
-                                                        .setCubeSegmentStatistics(cubeSegmentStatistics);
-                                                logger.info(
-                                                        "Query-{}: try to put segment query result to cache for segment:{}",
-                                                        queryContext.getQueryId(), cubeSeg);
-                                                SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder
-                                                        .build();
-                                                SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
-                                                        segmentQueryResult);
-                                                logger.info(
-                                                        "Query-{}: successfully put segment query result to cache for segment:{}",
-                                                        queryContext.getQueryId(), cubeSeg);
-                                            }
+                            epResultItr.append(queueData);
+                            // put segment query result to cache if cache is enabled
+                            if (querySegmentCacheEnabled) {
+                                try {
+                                    segmentQueryResultBuilder.putRegionResult(rawData);
+                                    if (segmentQueryResultBuilder.isComplete()) {
+                                        CubeSegmentStatistics cubeSegmentStatistics = queryContext
+                                                .getCubeSegmentStatistics(storageContext.ctxId,
+                                                        cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
+                                        if (cubeSegmentStatistics != null) {
+                                            segmentQueryResultBuilder.setCubeSegmentStatistics(cubeSegmentStatistics);
+                                            logger.info(
+                                                    "Query-{}: try to put segment query result to cache for segment:{}",
+                                                    queryContext.getQueryId(), cubeSeg);
+                                            SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder.build();
+                                            SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+                                                    segmentQueryResult);
+                                            logger.info(
+                                                    "Query-{}: successfully put segment query result to cache for segment:{}",
+                                                    queryContext.getQueryId(), cubeSeg);
                                         }
-                                    } catch (Throwable t) {
-                                        logger.error("Fail to put query segment result to cache", t);
                                     }
+                                } catch (Throwable t) {
+                                    logger.error("Fail to put query segment result to cache", t);
                                 }
-                            } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException(logHeader + "Error when decompressing", e);
                             }
                         }
                     });