You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/10 09:30:12 UTC
[kylin] branch 2.6.x updated (85c7388 -> edab859)
This is an automated email from the ASF dual-hosted git repository.
nic pushed a change to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git.
from 85c7388 KYLIN-3995 config data type may make oom
new dff3a3c KYLIN-3926 set sourceRecordCount when updating statistics
new edab859 KYLIN-3926 Code review
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../kylin/engine/mr/common/CubeStatsReader.java | 4 ++
.../kylin/engine/mr/common/CubeStatsWriter.java | 5 +++
.../engine/mr/steps/MergeDictionaryMapper.java | 20 ++++++---
.../kylin/engine/mr/steps/MergeStatisticsStep.java | 24 +++++++++--
.../mr/steps/MergeStatisticsWithOldStep.java | 2 +-
.../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
6 files changed, 77 insertions(+), 25 deletions(-)
[kylin] 02/02: KYLIN-3926 Code review
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit edab859ee51128f94d4e192abc56dc1171c468a9
Author: nichunen <ni...@apache.org>
AuthorDate: Tue May 7 13:59:24 2019 +0800
KYLIN-3926 Code review
---
.../engine/mr/steps/MergeDictionaryMapper.java | 20 ++++++---
.../kylin/engine/mr/steps/MergeStatisticsStep.java | 12 ++++--
.../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
3 files changed, 55 insertions(+), 24 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
index 8e09783..0ce013e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -133,6 +133,8 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
@@ -162,7 +164,14 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
if (keyW.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
- } else if (keyW.get() > 0) {
+ } else if (keyW.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
+ } else if (keyW.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(valueW.getBytes());
hll.readRegisters(byteArray.asBuffer());
@@ -181,12 +190,13 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
IOUtils.closeStream(reader);
}
}
-
- averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
- averageSamplingPercentage);
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5d4b35d..9fab177 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -78,7 +78,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
long sourceRecordCount = 0;
long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
- String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
+ String fileKey = CubeSegment
+ .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -129,12 +130,15 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
- sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
- averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
+ averageSamplingPercentage = averageSamplingPercentage
+ / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf,
new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
- Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
+ BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 286c6f3..7e58871 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,8 +18,13 @@
package org.apache.kylin.engine.spark;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -62,14 +67,11 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
/**
merge dictionary
@@ -236,14 +238,16 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
File tempFile = File.createTempFile(segmentId, ".seq");
- try(InputStream is = rs.getResource(filePath).content();
- FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
+ try (InputStream is = rs.getResource(filePath).content();
+ FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
org.apache.commons.io.IOUtils.copy(is, tempFileStream);
}
@@ -252,15 +256,24 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
conf = HadoopUtil.getCurrentConfiguration();
- try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ new Path(tempFile.getAbsolutePath()), conf)) {
//noinspection deprecation
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
+ conf);
while (reader.next(key, value)) {
if (key.get() == 0L) {
// sampling percentage
averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(value.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -276,9 +289,13 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
}
}
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
- Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
+ Path statisticsFilePath = new Path(statOutputPath,
+ BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
[kylin] 01/02: KYLIN-3926 set sourceRecordCount when updating
statistics
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit dff3a3c3c9dd2802e44de8273b785339ce232441
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Apr 1 16:39:26 2019 +0800
KYLIN-3926 set sourceRecordCount when updating statistics
---
.../org/apache/kylin/engine/mr/common/CubeStatsReader.java | 4 ++++
.../org/apache/kylin/engine/mr/common/CubeStatsWriter.java | 5 +++++
.../apache/kylin/engine/mr/steps/MergeStatisticsStep.java | 14 +++++++++++++-
.../kylin/engine/mr/steps/MergeStatisticsWithOldStep.java | 2 +-
4 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 58f0e66..e935173 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -161,6 +161,10 @@ public class CubeStatsReader {
return samplingPercentage;
}
+ public long getSourceRowCount() {
+ return sourceRowCount;
+ }
+
public Map<Long, Long> getCuboidRowEstimatesHLL() {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index c3d6042..0945908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -44,6 +44,11 @@ public class CubeStatsWriter {
}
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
+ Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCoun) throws IOException {
+ writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCoun);
+ }
+
+ public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCoun) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 64ceebe..5d4b35d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -75,6 +75,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
try {
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
@@ -99,6 +101,13 @@ public class MergeStatisticsStep extends AbstractExecutable {
if (key.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(value.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cube.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -120,8 +129,11 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+ CubeStatsWriter.writeCuboidStatistics(conf,
+ new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 434892c..8dd7341 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -120,7 +120,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
- averageSamplingPercentage);
+ averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());
try (FSDataInputStream mergedStats = hdfs
.open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {