You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/07 12:49:22 UTC
[kylin] 01/02: KYLIN-3926 set sourceRecordCount when updating
statistics
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5098f206ec86fee7d5e68c782e9e3aa803113fc3
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Apr 1 16:39:26 2019 +0800
KYLIN-3926 set sourceRecordCount when updating statistics
---
.../org/apache/kylin/engine/mr/common/CubeStatsReader.java | 4 ++++
.../org/apache/kylin/engine/mr/common/CubeStatsWriter.java | 5 +++++
.../apache/kylin/engine/mr/steps/MergeStatisticsStep.java | 14 +++++++++++++-
.../kylin/engine/mr/steps/MergeStatisticsWithOldStep.java | 2 +-
4 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 58f0e66..e935173 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -161,6 +161,10 @@ public class CubeStatsReader {
return samplingPercentage;
}
+ public long getSourceRowCount() {
+ return sourceRowCount;
+ }
+
public Map<Long, Long> getCuboidRowEstimatesHLL() {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index c3d6042..0945908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -44,6 +44,11 @@ public class CubeStatsWriter {
}
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
+ Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCoun) throws IOException {
+ writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCoun);
+ }
+
+ public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCoun) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 64ceebe..5d4b35d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -75,6 +75,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
try {
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
@@ -99,6 +101,13 @@ public class MergeStatisticsStep extends AbstractExecutable {
if (key.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(value.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cube.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -120,8 +129,11 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+ CubeStatsWriter.writeCuboidStatistics(conf,
+ new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 434892c..8dd7341 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -120,7 +120,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
- averageSamplingPercentage);
+ averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());
try (FSDataInputStream mergedStats = hdfs
.open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {