You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/07 12:49:22 UTC

[kylin] 01/02: KYLIN-3926 set sourceRecordCount when updating statistics

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5098f206ec86fee7d5e68c782e9e3aa803113fc3
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Apr 1 16:39:26 2019 +0800

    KYLIN-3926 set sourceRecordCount when updating statistics
---
 .../org/apache/kylin/engine/mr/common/CubeStatsReader.java |  4 ++++
 .../org/apache/kylin/engine/mr/common/CubeStatsWriter.java |  5 +++++
 .../apache/kylin/engine/mr/steps/MergeStatisticsStep.java  | 14 +++++++++++++-
 .../kylin/engine/mr/steps/MergeStatisticsWithOldStep.java  |  2 +-
 4 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 58f0e66..e935173 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -161,6 +161,10 @@ public class CubeStatsReader {
         return samplingPercentage;
     }
 
+    public long getSourceRowCount() {
+        return sourceRowCount;
+    }
+
     public Map<Long, Long> getCuboidRowEstimatesHLL() {
         return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index c3d6042..0945908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -44,6 +44,11 @@ public class CubeStatsWriter {
     }
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCoun) throws IOException {
+        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCoun);
+    }
+
+    public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
             long sourceRecordCoun) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 64ceebe..5d4b35d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -75,6 +75,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
         try {
 
             int averageSamplingPercentage = 0;
+            long sourceRecordCount = 0;
+            long effectiveTimeRange = 0;
             for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
                 String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
                 InputStream is = rs.getResource(fileKey).content();
@@ -99,6 +101,13 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         if (key.get() == 0L) {
                             // sampling percentage;
                             averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                        } else if (key.get() == -3) {
+                            long perSourceRecordCount = Bytes.toLong(value.getBytes());
+                            if (perSourceRecordCount > 0) {
+                                sourceRecordCount += perSourceRecordCount;
+                                CubeSegment iSegment = cube.getSegmentById(segmentId);
+                                effectiveTimeRange += iSegment.getTSRange().duration();
+                            }
                         } else if (key.get() > 0) {
                             HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
                             ByteArray byteArray = new ByteArray(value.getBytes());
@@ -120,8 +129,11 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         tempFile.delete();
                 }
             }
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
             averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
-            CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+            CubeStatsWriter.writeCuboidStatistics(conf,
+                    new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
+                    averageSamplingPercentage, sourceRecordCount);
             Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 434892c..8dd7341 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -120,7 +120,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
 
             String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
             CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
-                    averageSamplingPercentage);
+                    averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());
 
             try (FSDataInputStream mergedStats = hdfs
                     .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {