You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/07 12:49:23 UTC
[kylin] 02/02: KYLIN-3926 Code review
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ab124ac9ffd988c9e1ec4b0d9513db49724b70a1
Author: nichunen <ni...@apache.org>
AuthorDate: Tue May 7 13:59:24 2019 +0800
KYLIN-3926 Code review
---
.../engine/mr/steps/MergeDictionaryMapper.java | 20 ++++++---
.../kylin/engine/mr/steps/MergeStatisticsStep.java | 12 ++++--
.../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
3 files changed, 55 insertions(+), 24 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
index 8e09783..0ce013e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -133,6 +133,8 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
@@ -162,7 +164,14 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
if (keyW.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
- } else if (keyW.get() > 0) {
+ } else if (keyW.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
+ } else if (keyW.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(valueW.getBytes());
hll.readRegisters(byteArray.asBuffer());
@@ -181,12 +190,13 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
IOUtils.closeStream(reader);
}
}
-
- averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
- averageSamplingPercentage);
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5d4b35d..9fab177 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -78,7 +78,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
long sourceRecordCount = 0;
long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
- String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
+ String fileKey = CubeSegment
+ .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
File tempFile = null;
FileOutputStream tempFileStream = null;
@@ -129,12 +130,15 @@ public class MergeStatisticsStep extends AbstractExecutable {
tempFile.delete();
}
}
- sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
- averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
+ averageSamplingPercentage = averageSamplingPercentage
+ / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf,
new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
- Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
+ BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 286c6f3..7e58871 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,8 +18,13 @@
package org.apache.kylin.engine.spark;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -62,14 +67,11 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
/**
merge dictionary
@@ -236,14 +238,16 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ long effectiveTimeRange = 0;
for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
File tempFile = File.createTempFile(segmentId, ".seq");
- try(InputStream is = rs.getResource(filePath).content();
- FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
+ try (InputStream is = rs.getResource(filePath).content();
+ FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
org.apache.commons.io.IOUtils.copy(is, tempFileStream);
}
@@ -252,15 +256,24 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
conf = HadoopUtil.getCurrentConfiguration();
- try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ new Path(tempFile.getAbsolutePath()), conf)) {
//noinspection deprecation
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
+ conf);
while (reader.next(key, value)) {
if (key.get() == 0L) {
// sampling percentage
averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(value.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+ effectiveTimeRange += iSegment.getTSRange().duration();
+ }
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -276,9 +289,13 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
}
}
+ sourceRecordCount *= effectiveTimeRange == 0 ? 0
+ : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
- CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
- Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
+ Path statisticsFilePath = new Path(statOutputPath,
+ BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);