You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/07 12:49:23 UTC

[kylin] 02/02: KYLIN-3926 Code review

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ab124ac9ffd988c9e1ec4b0d9513db49724b70a1
Author: nichunen <ni...@apache.org>
AuthorDate: Tue May 7 13:59:24 2019 +0800

    KYLIN-3926 Code review
---
 .../engine/mr/steps/MergeDictionaryMapper.java     | 20 ++++++---
 .../kylin/engine/mr/steps/MergeStatisticsStep.java | 12 ++++--
 .../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
 3 files changed, 55 insertions(+), 24 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
index 8e09783..0ce013e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -133,6 +133,8 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
             Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
             Configuration conf = null;
             int averageSamplingPercentage = 0;
+            long sourceRecordCount = 0;
+            long effectiveTimeRange = 0;
 
             for (CubeSegment cubeSegment : mergingSegments) {
                 String filePath = cubeSegment.getStatisticsResourcePath();
@@ -162,7 +164,14 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
                         if (keyW.get() == 0L) {
                             // sampling percentage;
                             averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
-                        } else if (keyW.get() > 0) {
+                        } else if (keyW.get() == -3) {
+                            long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
+                            if (perSourceRecordCount > 0) {
+                                sourceRecordCount += perSourceRecordCount;
+                                CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+                                effectiveTimeRange += iSegment.getTSRange().duration();
+                            }
+                        }  else if (keyW.get() > 0) {
                             HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
                             ByteArray byteArray = new ByteArray(valueW.getBytes());
                             hll.readRegisters(byteArray.asBuffer());
@@ -181,12 +190,13 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
                     IOUtils.closeStream(reader);
                 }
             }
-
-            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
-            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
-                    averageSamplingPercentage);
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                    : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
             Path statisticsFilePath = new Path(statOutputPath,
                     BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+                    averageSamplingPercentage, sourceRecordCount);
 
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream fis = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5d4b35d..9fab177 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -78,7 +78,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
             long sourceRecordCount = 0;
             long effectiveTimeRange = 0;
             for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
-                String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
+                String fileKey = CubeSegment
+                        .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
                 InputStream is = rs.getResource(fileKey).content();
                 File tempFile = null;
                 FileOutputStream tempFileStream = null;
@@ -129,12 +130,15 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         tempFile.delete();
                 }
             }
-            sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
-            averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                    : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
+            averageSamplingPercentage = averageSamplingPercentage
+                    / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
             CubeStatsWriter.writeCuboidStatistics(conf,
                     new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
                     averageSamplingPercentage, sourceRecordCount);
-            Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
+                    BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 286c6f3..7e58871 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,8 +18,13 @@
 
 package org.apache.kylin.engine.spark;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -62,14 +67,11 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
 
 /**
     merge dictionary
@@ -236,14 +238,16 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
                     Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
                     Configuration conf = null;
                     int averageSamplingPercentage = 0;
+                    long sourceRecordCount = 0;
+                    long effectiveTimeRange = 0;
 
                     for (CubeSegment cubeSegment : mergingSegments) {
                         String filePath = cubeSegment.getStatisticsResourcePath();
 
                         File tempFile = File.createTempFile(segmentId, ".seq");
 
-                        try(InputStream is = rs.getResource(filePath).content();
-                            FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
+                        try (InputStream is = rs.getResource(filePath).content();
+                                FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
 
                             org.apache.commons.io.IOUtils.copy(is, tempFileStream);
                         }
@@ -252,15 +256,24 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
 
                         conf = HadoopUtil.getCurrentConfiguration();
 
-                        try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+                        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+                                new Path(tempFile.getAbsolutePath()), conf)) {
                             //noinspection deprecation
                             LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
+                                    conf);
 
                             while (reader.next(key, value)) {
                                 if (key.get() == 0L) {
                                     // sampling percentage
                                     averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                                } else if (key.get() == -3) {
+                                    long perSourceRecordCount = Bytes.toLong(value.getBytes());
+                                    if (perSourceRecordCount > 0) {
+                                        sourceRecordCount += perSourceRecordCount;
+                                        CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+                                        effectiveTimeRange += iSegment.getTSRange().duration();
+                                    }
                                 } else if (key.get() > 0) {
                                     HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
                                     ByteArray byteArray = new ByteArray(value.getBytes());
@@ -276,9 +289,13 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
                         }
                     }
 
+                    sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                            : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
                     averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
-                    CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
-                    Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+                    CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+                            averageSamplingPercentage, sourceRecordCount);
+                    Path statisticsFilePath = new Path(statOutputPath,
+                            BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
 
                     FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
                     FSDataInputStream fis = fs.open(statisticsFilePath);