You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/27 01:48:16 UTC

[kylin] branch master updated: KYLIN-3453 Improve cube size estimation for topn, count distinct

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd0026  KYLIN-3453 Improve cube size estimation for topn,count distinct
1cd0026 is described below

commit 1cd00269a63f51a011f93effe6fc982d3e614459
Author: chao long <wa...@qq.com>
AuthorDate: Thu Jul 26 11:46:25 2018 +0800

    KYLIN-3453 Improve cube size estimation for topn,count distinct
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 16 +++++----
 .../kylin/measure/bitmap/BitmapSerializer.java     | 14 ++++++++
 .../apache/kylin/measure/hllc/HLLCSerializer.java  | 12 +++++++
 .../org/apache/kylin/measure/hllc/HLLCounter.java  |  4 +--
 .../kylin/measure/topn/TopNCounterSerializer.java  | 17 ++++++++--
 .../kylin/engine/mr/common/CubeStatsReader.java    | 39 +++++++++++++++++-----
 .../kylin/engine/mr/common/CubeStatsWriter.java    | 16 +++++----
 .../kylin/engine/mr/steps/SaveStatisticsStep.java  |  9 ++---
 8 files changed, 99 insertions(+), 28 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b2331e1..43e7d62 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -426,7 +426,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
     }
 
-
     // ============================================================================
     // CUBE
     // ============================================================================
@@ -449,7 +448,11 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public double getJobCuboidSizeCountDistinctRatio() {
-        return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05"));
+        return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5"));
+    }
+
+    public double getJobCuboidSizeTopNRatio() {
+        return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5"));
     }
 
     public String getCubeAlgorithm() {
@@ -876,7 +879,7 @@ abstract public class KylinConfigBase implements Serializable {
     public Map<String, String> getSqoopConfigOverride() {
         return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
     }
-    
+
     public String getJdbcSourceFieldDelimiter() {
         return getOptional("kylin.source.jdbc.field-delimiter", "|");
     }
@@ -1227,11 +1230,11 @@ abstract public class KylinConfigBase implements Serializable {
     public Boolean isEnumerableRulesEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false"));
     }
-    
+
     public boolean isReduceExpressionsRulesEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true"));
     }
-    
+
     public boolean isConvertCreateTableToWith() {
         return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false"));
     }
@@ -1332,12 +1335,13 @@ abstract public class KylinConfigBase implements Serializable {
     public int getBadQueryDefaultAlertingSeconds() {
         return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90"));
     }
+
     public double getBadQueryDefaultAlertingCoefficient() {
         return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5"));
     }
 
     public int getBadQueryDefaultDetectIntervalSeconds() {
-        int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
+        int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
         if (time == 0) {
             time = 60; // 60 sec
         }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 1c13876..29a25e9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -30,6 +30,7 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
 
     private static final int IS_RESULT_FLAG = 1;
     private static final int RESULT_SIZE = 12;
+    private static final int DEFAULT_MAX_SIZE = 1024;
 
     // called by reflection
     public BitmapSerializer(DataType type) {
@@ -86,6 +87,19 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
     }
 
     @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096
+        if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) {
+            // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size
+            // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets
+            // size * 8 for values array
+            return 8 + averageNumOfElementsInCounter * 16;
+        } else {
+            return getStorageBytesEstimate();
+        }
+    }
+
+    @Override
     public boolean supportDirectReturnResult() {
         return true;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index 98bc5cf..9310864 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -80,4 +80,16 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> {
         return current().maxLength();
     }
 
+    @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        int registerIndexSize = current().getRegisterIndexSize();
+        int m = 1 << precision;
+        if (!current().isDense((int) averageNumOfElementsInCounter)
+                || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) {
+            // 5 = 1 + 4 for scheme and size
+            // size * (getRegisterIndexSize + 1)
+            return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1);
+        }
+        return getStorageBytesEstimate();
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index b793465..80bbb2a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -78,7 +78,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         }
     }
 
-    private boolean isDense(int size) {
+    public boolean isDense(int size) {
         double over = OVERFLOW_FACTOR * m;
         return size > (int) over;
     }
@@ -358,7 +358,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         return 1 + m;
     }
 
-    private int getRegisterIndexSize() {
+    public int getRegisterIndexSize() {
         return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 77a69cf..eff510f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -61,12 +61,12 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
 
     @Override
     public int maxLength() {
-        return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M
+        return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M
     }
 
     @Override
     public int getStorageBytesEstimate() {
-        return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8);
+        return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter();
     }
 
     @Override
@@ -107,4 +107,17 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
         return counter;
     }
 
+    @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) {
+            return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12;
+        } else {
+            return getStorageBytesEstimate();
+        }
+    }
+
+    private int storageBytesEstimatePerCounter() {
+        return (scale + 8);
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 3c054a3..6b8934a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -55,6 +55,7 @@ import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -79,6 +80,7 @@ public class CubeStatsReader {
     final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge
     final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
     final CuboidScheduler cuboidScheduler;
+    final long sourceRowCount;
 
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
         this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
@@ -94,7 +96,7 @@ public class CubeStatsReader {
         RawResource resource = store.getResource(statsKey);
         if (resource == null)
             throw new IllegalStateException("Missing resource at " + statsKey);
-        
+
         File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
         Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
 
@@ -107,6 +109,7 @@ public class CubeStatsReader {
         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
         this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+        this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
     }
 
     /**
@@ -129,6 +132,7 @@ public class CubeStatsReader {
         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
         this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+        this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
     }
 
     private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -158,7 +162,7 @@ public class CubeStatsReader {
 
     // return map of Cuboid ID => MB
     public Map<Long, Double> getCuboidSizeMap() {
-        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
+        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
     }
 
     public double estimateCubeSize() {
@@ -184,7 +188,8 @@ public class CubeStatsReader {
         return cuboidRowCountMap;
     }
 
-    public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
+    public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
+            long sourceRowCount) {
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         final List<Integer> rowkeyColumnSize = Lists.newArrayList();
         final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
@@ -199,7 +204,7 @@ public class CubeStatsReader {
         Map<Long, Double> sizeMap = Maps.newHashMap();
         for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
             sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
-                    baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize));
+                    baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
         }
         return sizeMap;
     }
@@ -210,7 +215,7 @@ public class CubeStatsReader {
      * @return the cuboid size in M bytes
      */
     private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount,
-            long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength) {
+            long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength, long sourceRowCount) {
 
         int rowkeyLength = cubeSegment.getRowKeyPreambleSize();
         KylinConfig kylinConf = cubeSegment.getConfig();
@@ -228,12 +233,21 @@ public class CubeStatsReader {
         int normalSpace = rowkeyLength;
         int countDistinctSpace = 0;
         double percentileSpace = 0;
+        int topNSpace = 0;
         for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
+            if (rowCount == 0)
+                break;
             DataType returnType = measureDesc.getFunction().getReturnDataType();
             if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) {
-                countDistinctSpace += returnType.getStorageBytesEstimate();
+                long estimateDistinctCount = sourceRowCount / rowCount;
+                estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount;
+                countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount);
             } else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) {
                 percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount);
+            } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) {
+                long estimateTopNCount = sourceRowCount / rowCount;
+                estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount;
+                topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount);
             } else {
                 normalSpace += returnType.getStorageBytesEstimate();
             }
@@ -241,9 +255,11 @@ public class CubeStatsReader {
 
         double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
         double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
+        double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();
+
         double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
-                + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount)
-                / (1024L * 1024L);
+                + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
+                + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
         return ret;
     }
 
@@ -351,6 +367,7 @@ public class CubeStatsReader {
     public static class CubeStatsResult {
         private int percentage = 100;
         private double mapperOverlapRatio = 0;
+        private long sourceRecordCount = 0;
         private int mapperNumber = 0;
         private Map<Long, HLLCounter> counterMap = Maps.newHashMap();
 
@@ -367,6 +384,8 @@ public class CubeStatsReader {
                         mapperOverlapRatio = Bytes.toDouble(value.getBytes());
                     } else if (key.get() == -2) {
                         mapperNumber = Bytes.toInt(value.getBytes());
+                    } else if (key.get() == -3) {
+                        sourceRecordCount = Bytes.toLong(value.getBytes());
                     } else if (key.get() > 0) {
                         HLLCounter hll = new HLLCounter(precision);
                         ByteArray byteArray = new ByteArray(value.getBytes());
@@ -392,6 +411,10 @@ public class CubeStatsReader {
         public Map<Long, HLLCounter> getCounterMap() {
             return Collections.unmodifiableMap(counterMap);
         }
+
+        public long getSourceRecordCount() {
+            return sourceRecordCount;
+        }
     }
 
     public static void main(String[] args) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index f50a4be..c3d6042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -40,14 +40,15 @@ public class CubeStatsWriter {
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
-        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0);
+        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
     }
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
-            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException {
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+            long sourceRecordCoun) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
         writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
-                mapperOverlapRatio);
+                mapperOverlapRatio, sourceRecordCoun);
     }
 
     //Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS,
@@ -57,12 +58,12 @@ public class CubeStatsWriter {
             int shard) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
         writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
-                mapperOverlapRatio);
+                mapperOverlapRatio, 0);
     }
 
     private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, //
-            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio)
-            throws IOException {
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+            long sourceRecordCount) throws IOException {
         List<Long> allCuboids = Lists.newArrayList();
         allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
@@ -80,6 +81,9 @@ public class CubeStatsWriter {
             // sampling percentage at key 0
             writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)));
 
+            // flat table source_count at key -3
+            writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount)));
+
             for (long i : allCuboids) {
                 valueBuf.clear();
                 cuboidHLLMap.get(i).writeRegisters(valueBuf);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index b532360..1f79539 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -122,12 +122,15 @@ public class SaveStatisticsStep extends AbstractExecutable {
                         totalRowsBeforeMerge);
             }
             double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
+            CubingJob cubingJob = (CubingJob) getManager()
+                    .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+            long sourceRecordCount = cubingJob.findSourceRecordCount();
             CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,
-                    mapperNumber, mapperOverlapRatio);
+                    mapperNumber, mapperOverlapRatio, sourceRecordCount);
 
             Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
-            
+
             FSDataInputStream is = fs.open(statisticsFile);
             try {
                 // put the statistics to metadata store
@@ -135,8 +138,6 @@ public class SaveStatisticsStep extends AbstractExecutable {
                 rs.putResource(resPath, is, System.currentTimeMillis());
                 logger.info(newSegment + " stats saved to resource " + resPath);
 
-                CubingJob cubingJob = (CubingJob) getManager()
-                        .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
                 StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
                 StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
             } finally {