You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/07/27 01:48:16 UTC

[GitHub] shaofengshi closed pull request #172: KYLIN-3453 Improve cube size estimation for topn, count distinct

shaofengshi closed pull request #172: KYLIN-3453 Improve cube size estimation for topn,count distinct
URL: https://github.com/apache/kylin/pull/172
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 637502ef05..025a982bdf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -426,7 +426,6 @@ public double getExtTableSnapshotLocalCacheMaxSizeGB() {
         return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
     }
 
-
     // ============================================================================
     // CUBE
     // ============================================================================
@@ -449,7 +448,11 @@ public double getJobCuboidSizeMemHungryRatio() {
     }
 
     public double getJobCuboidSizeCountDistinctRatio() {
-        return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05"));
+        return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5"));
+    }
+
+    public double getJobCuboidSizeTopNRatio() {
+        return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5"));
     }
 
     public String getCubeAlgorithm() {
@@ -872,7 +875,7 @@ public int getSqoopMapperNum() {
     public Map<String, String> getSqoopConfigOverride() {
         return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
     }
-    
+
     public String getJdbcSourceFieldDelimiter() {
         return getOptional("kylin.source.jdbc.field-delimiter", "|");
     }
@@ -1223,11 +1226,11 @@ public boolean isSparkSanityCheckEnabled() {
     public Boolean isEnumerableRulesEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false"));
     }
-    
+
     public boolean isReduceExpressionsRulesEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true"));
     }
-    
+
     public boolean isConvertCreateTableToWith() {
         return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false"));
     }
@@ -1328,12 +1331,13 @@ public int getBadQueryHistoryNum() {
     public int getBadQueryDefaultAlertingSeconds() {
         return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90"));
     }
+
     public double getBadQueryDefaultAlertingCoefficient() {
         return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5"));
     }
 
     public int getBadQueryDefaultDetectIntervalSeconds() {
-        int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
+        int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
         if (time == 0) {
             time = 60; // 60 sec
         }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 1c138761f3..29a25e9b5b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -30,6 +30,7 @@
 
     private static final int IS_RESULT_FLAG = 1;
     private static final int RESULT_SIZE = 12;
+    private static final int DEFAULT_MAX_SIZE = 1024;
 
     // called by reflection
     public BitmapSerializer(DataType type) {
@@ -85,6 +86,19 @@ public int getStorageBytesEstimate() {
         return 8 * 1024;
     }
 
+    @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096
+        if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) {
+            // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size
+            // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets
+            // size * 8 for values array
+            return 8 + averageNumOfElementsInCounter * 16;
+        } else {
+            return getStorageBytesEstimate();
+        }
+    }
+
     @Override
     public boolean supportDirectReturnResult() {
         return true;
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index 98bc5cf772..9310864916 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -80,4 +80,16 @@ public int getStorageBytesEstimate() {
         return current().maxLength();
     }
 
+    @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        int registerIndexSize = current().getRegisterIndexSize();
+        int m = 1 << precision;
+        if (!current().isDense((int) averageNumOfElementsInCounter)
+                || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) {
+            // 5 = 1 + 4 for scheme and size
+            // size * (getRegisterIndexSize + 1)
+            return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1);
+        }
+        return getStorageBytesEstimate();
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index b79346503e..80bbb2a9c1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -78,7 +78,7 @@ public HLLCounter(int p, RegisterType type) {
         }
     }
 
-    private boolean isDense(int size) {
+    public boolean isDense(int size) {
         double over = OVERFLOW_FACTOR * m;
         return size > (int) over;
     }
@@ -358,7 +358,7 @@ public int maxLength() {
         return 1 + m;
     }
 
-    private int getRegisterIndexSize() {
+    public int getRegisterIndexSize() {
         return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 77a69cf935..eff510f8d0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -61,12 +61,12 @@ public int peekLength(ByteBuffer in) {
 
     @Override
     public int maxLength() {
-        return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M
+        return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M
     }
 
     @Override
     public int getStorageBytesEstimate() {
-        return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8);
+        return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter();
     }
 
     @Override
@@ -107,4 +107,17 @@ public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
         return counter;
     }
 
+    @Override
+    protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+        if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) {
+            return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12;
+        } else {
+            return getStorageBytesEstimate();
+        }
+    }
+
+    private int storageBytesEstimatePerCounter() {
+        return (scale + 8);
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 3c054a3283..6b8934abb4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -55,6 +55,7 @@
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -79,6 +80,7 @@
     final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge
     final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
     final CuboidScheduler cuboidScheduler;
+    final long sourceRowCount;
 
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
         this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
@@ -94,7 +96,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
         RawResource resource = store.getResource(statsKey);
         if (resource == null)
             throw new IllegalStateException("Missing resource at " + statsKey);
-        
+
         File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
         Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
 
@@ -107,6 +109,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
         this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+        this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
     }
 
     /**
@@ -129,6 +132,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
         this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+        this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
     }
 
     private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -158,7 +162,7 @@ public int getSamplingPercentage() {
 
     // return map of Cuboid ID => MB
     public Map<Long, Double> getCuboidSizeMap() {
-        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
+        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
     }
 
     public double estimateCubeSize() {
@@ -184,7 +188,8 @@ public double getMapperOverlapRatioOfFirstBuild() {
         return cuboidRowCountMap;
     }
 
-    public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
+    public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
+            long sourceRowCount) {
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         final List<Integer> rowkeyColumnSize = Lists.newArrayList();
         final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
@@ -199,7 +204,7 @@ public double getMapperOverlapRatioOfFirstBuild() {
         Map<Long, Double> sizeMap = Maps.newHashMap();
         for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
             sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
-                    baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize));
+                    baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
         }
         return sizeMap;
     }
@@ -210,7 +215,7 @@ public double getMapperOverlapRatioOfFirstBuild() {
      * @return the cuboid size in M bytes
      */
     private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount,
-            long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength) {
+            long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength, long sourceRowCount) {
 
         int rowkeyLength = cubeSegment.getRowKeyPreambleSize();
         KylinConfig kylinConf = cubeSegment.getConfig();
@@ -228,12 +233,21 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu
         int normalSpace = rowkeyLength;
         int countDistinctSpace = 0;
         double percentileSpace = 0;
+        int topNSpace = 0;
         for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
+            if (rowCount == 0)
+                break;
             DataType returnType = measureDesc.getFunction().getReturnDataType();
             if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) {
-                countDistinctSpace += returnType.getStorageBytesEstimate();
+                long estimateDistinctCount = sourceRowCount / rowCount;
+                estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount;
+                countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount);
             } else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) {
                 percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount);
+            } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) {
+                long estimateTopNCount = sourceRowCount / rowCount;
+                estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount;
+                topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount);
             } else {
                 normalSpace += returnType.getStorageBytesEstimate();
             }
@@ -241,9 +255,11 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu
 
         double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
         double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
+        double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();
+
         double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
-                + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount)
-                / (1024L * 1024L);
+                + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
+                + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
         return ret;
     }
 
@@ -351,6 +367,7 @@ private static String formatDouble(double input) {
     public static class CubeStatsResult {
         private int percentage = 100;
         private double mapperOverlapRatio = 0;
+        private long sourceRecordCount = 0;
         private int mapperNumber = 0;
         private Map<Long, HLLCounter> counterMap = Maps.newHashMap();
 
@@ -367,6 +384,8 @@ public CubeStatsResult(Path path, int precision) throws IOException {
                         mapperOverlapRatio = Bytes.toDouble(value.getBytes());
                     } else if (key.get() == -2) {
                         mapperNumber = Bytes.toInt(value.getBytes());
+                    } else if (key.get() == -3) {
+                        sourceRecordCount = Bytes.toLong(value.getBytes());
                     } else if (key.get() > 0) {
                         HLLCounter hll = new HLLCounter(precision);
                         ByteArray byteArray = new ByteArray(value.getBytes());
@@ -392,6 +411,10 @@ public int getMapperNumber() {
         public Map<Long, HLLCounter> getCounterMap() {
             return Collections.unmodifiableMap(counterMap);
         }
+
+        public long getSourceRecordCount() {
+            return sourceRecordCount;
+        }
     }
 
     public static void main(String[] args) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index f50a4beb97..c3d6042b1d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -40,14 +40,15 @@
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
-        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0);
+        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
     }
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
-            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException {
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+            long sourceRecordCoun) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
         writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
-                mapperOverlapRatio);
+                mapperOverlapRatio, sourceRecordCoun);
     }
 
     //Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS,
@@ -57,12 +58,12 @@ public static void writePartialCuboidStatistics(Configuration conf, Path outputP
             int shard) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
         writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
-                mapperOverlapRatio);
+                mapperOverlapRatio, 0);
     }
 
     private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, //
-            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio)
-            throws IOException {
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+            long sourceRecordCount) throws IOException {
         List<Long> allCuboids = Lists.newArrayList();
         allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
@@ -80,6 +81,9 @@ private static void writeCuboidStatisticsInner(Configuration conf, Path outputFi
             // sampling percentage at key 0
             writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)));
 
+            // flat table source_count at key -3
+            writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount)));
+
             for (long i : allCuboids) {
                 valueBuf.clear();
                 cuboidHLLMap.get(i).writeRegisters(valueBuf);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index b532360f36..1f79539f6d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -122,12 +122,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
                         totalRowsBeforeMerge);
             }
             double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
+            CubingJob cubingJob = (CubingJob) getManager()
+                    .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+            long sourceRecordCount = cubingJob.findSourceRecordCount();
             CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,
-                    mapperNumber, mapperOverlapRatio);
+                    mapperNumber, mapperOverlapRatio, sourceRecordCount);
 
             Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
-            
+
             FSDataInputStream is = fs.open(statisticsFile);
             try {
                 // put the statistics to metadata store
@@ -135,8 +138,6 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
                 rs.putResource(resPath, is, System.currentTimeMillis());
                 logger.info(newSegment + " stats saved to resource " + resPath);
 
-                CubingJob cubingJob = (CubingJob) getManager()
-                        .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
                 StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
                 StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
             } finally {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services