You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/05/10 09:30:12 UTC

[kylin] branch 2.6.x updated (85c7388 -> edab859)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a change to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 85c7388  KYLIN-3995 config data type may make oom
     new dff3a3c  KYLIN-3926 set sourceRecordCount when updating statistics
     new edab859  KYLIN-3926 Code review

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kylin/engine/mr/common/CubeStatsReader.java    |  4 ++
 .../kylin/engine/mr/common/CubeStatsWriter.java    |  5 +++
 .../engine/mr/steps/MergeDictionaryMapper.java     | 20 ++++++---
 .../kylin/engine/mr/steps/MergeStatisticsStep.java | 24 +++++++++--
 .../mr/steps/MergeStatisticsWithOldStep.java       |  2 +-
 .../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
 6 files changed, 77 insertions(+), 25 deletions(-)


[kylin] 02/02: KYLIN-3926 Code review

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit edab859ee51128f94d4e192abc56dc1171c468a9
Author: nichunen <ni...@apache.org>
AuthorDate: Tue May 7 13:59:24 2019 +0800

    KYLIN-3926 Code review
---
 .../engine/mr/steps/MergeDictionaryMapper.java     | 20 ++++++---
 .../kylin/engine/mr/steps/MergeStatisticsStep.java | 12 ++++--
 .../kylin/engine/spark/SparkMergingDictionary.java | 47 +++++++++++++++-------
 3 files changed, 55 insertions(+), 24 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
index 8e09783..0ce013e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -133,6 +133,8 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
             Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
             Configuration conf = null;
             int averageSamplingPercentage = 0;
+            long sourceRecordCount = 0;
+            long effectiveTimeRange = 0;
 
             for (CubeSegment cubeSegment : mergingSegments) {
                 String filePath = cubeSegment.getStatisticsResourcePath();
@@ -162,7 +164,14 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
                         if (keyW.get() == 0L) {
                             // sampling percentage;
                             averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
-                        } else if (keyW.get() > 0) {
+                        } else if (keyW.get() == -3) {
+                            long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
+                            if (perSourceRecordCount > 0) {
+                                sourceRecordCount += perSourceRecordCount;
+                                CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+                                effectiveTimeRange += iSegment.getTSRange().duration();
+                            }
+                        }  else if (keyW.get() > 0) {
                             HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
                             ByteArray byteArray = new ByteArray(valueW.getBytes());
                             hll.readRegisters(byteArray.asBuffer());
@@ -181,12 +190,13 @@ public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable
                     IOUtils.closeStream(reader);
                 }
             }
-
-            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
-            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
-                    averageSamplingPercentage);
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                    : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
             Path statisticsFilePath = new Path(statOutputPath,
                     BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+                    averageSamplingPercentage, sourceRecordCount);
 
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream fis = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 5d4b35d..9fab177 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -78,7 +78,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
             long sourceRecordCount = 0;
             long effectiveTimeRange = 0;
             for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
-                String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
+                String fileKey = CubeSegment
+                        .getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
                 InputStream is = rs.getResource(fileKey).content();
                 File tempFile = null;
                 FileOutputStream tempFileStream = null;
@@ -129,12 +130,15 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         tempFile.delete();
                 }
             }
-            sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
-            averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                    : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
+            averageSamplingPercentage = averageSamplingPercentage
+                    / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
             CubeStatsWriter.writeCuboidStatistics(conf,
                     new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
                     averageSamplingPercentage, sourceRecordCount);
-            Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
+                    BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 286c6f3..7e58871 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -18,8 +18,13 @@
 
 package org.apache.kylin.engine.spark;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -62,14 +67,11 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
 
 /**
     merge dictionary
@@ -236,14 +238,16 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
                     Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
                     Configuration conf = null;
                     int averageSamplingPercentage = 0;
+                    long sourceRecordCount = 0;
+                    long effectiveTimeRange = 0;
 
                     for (CubeSegment cubeSegment : mergingSegments) {
                         String filePath = cubeSegment.getStatisticsResourcePath();
 
                         File tempFile = File.createTempFile(segmentId, ".seq");
 
-                        try(InputStream is = rs.getResource(filePath).content();
-                            FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
+                        try (InputStream is = rs.getResource(filePath).content();
+                                FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
 
                             org.apache.commons.io.IOUtils.copy(is, tempFileStream);
                         }
@@ -252,15 +256,24 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
 
                         conf = HadoopUtil.getCurrentConfiguration();
 
-                        try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+                        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+                                new Path(tempFile.getAbsolutePath()), conf)) {
                             //noinspection deprecation
                             LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
+                                    conf);
 
                             while (reader.next(key, value)) {
                                 if (key.get() == 0L) {
                                     // sampling percentage
                                     averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                                } else if (key.get() == -3) {
+                                    long perSourceRecordCount = Bytes.toLong(value.getBytes());
+                                    if (perSourceRecordCount > 0) {
+                                        sourceRecordCount += perSourceRecordCount;
+                                        CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
+                                        effectiveTimeRange += iSegment.getTSRange().duration();
+                                    }
                                 } else if (key.get() > 0) {
                                     HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
                                     ByteArray byteArray = new ByteArray(value.getBytes());
@@ -276,9 +289,13 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
                         }
                     }
 
+                    sourceRecordCount *= effectiveTimeRange == 0 ? 0
+                            : (double) newSegment.getTSRange().duration() / effectiveTimeRange;
                     averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
-                    CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
-                    Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+                    CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
+                            averageSamplingPercentage, sourceRecordCount);
+                    Path statisticsFilePath = new Path(statOutputPath,
+                            BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
 
                     FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
                     FSDataInputStream fis = fs.open(statisticsFilePath);


[kylin] 01/02: KYLIN-3926 set sourceRecordCount when updating statistics

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dff3a3c3c9dd2802e44de8273b785339ce232441
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Apr 1 16:39:26 2019 +0800

    KYLIN-3926 set sourceRecordCount when updating statistics
---
 .../org/apache/kylin/engine/mr/common/CubeStatsReader.java |  4 ++++
 .../org/apache/kylin/engine/mr/common/CubeStatsWriter.java |  5 +++++
 .../apache/kylin/engine/mr/steps/MergeStatisticsStep.java  | 14 +++++++++++++-
 .../kylin/engine/mr/steps/MergeStatisticsWithOldStep.java  |  2 +-
 4 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 58f0e66..e935173 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -161,6 +161,10 @@ public class CubeStatsReader {
         return samplingPercentage;
     }
 
+    public long getSourceRowCount() {
+        return sourceRowCount;
+    }
+
     public Map<Long, Long> getCuboidRowEstimatesHLL() {
         return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index c3d6042..0945908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -44,6 +44,11 @@ public class CubeStatsWriter {
     }
 
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
+            Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCoun) throws IOException {
+        writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCoun);
+    }
+
+    public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
             long sourceRecordCoun) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 64ceebe..5d4b35d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -75,6 +75,8 @@ public class MergeStatisticsStep extends AbstractExecutable {
         try {
 
             int averageSamplingPercentage = 0;
+            long sourceRecordCount = 0;
+            long effectiveTimeRange = 0;
             for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
                 String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
                 InputStream is = rs.getResource(fileKey).content();
@@ -99,6 +101,13 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         if (key.get() == 0L) {
                             // sampling percentage;
                             averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                        } else if (key.get() == -3) {
+                            long perSourceRecordCount = Bytes.toLong(value.getBytes());
+                            if (perSourceRecordCount > 0) {
+                                sourceRecordCount += perSourceRecordCount;
+                                CubeSegment iSegment = cube.getSegmentById(segmentId);
+                                effectiveTimeRange += iSegment.getTSRange().duration();
+                            }
                         } else if (key.get() > 0) {
                             HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
                             ByteArray byteArray = new ByteArray(value.getBytes());
@@ -120,8 +129,11 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         tempFile.delete();
                 }
             }
+            sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
             averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
-            CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+            CubeStatsWriter.writeCuboidStatistics(conf,
+                    new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
+                    averageSamplingPercentage, sourceRecordCount);
             Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
index 434892c..8dd7341 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -120,7 +120,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable {
 
             String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
             CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
-                    averageSamplingPercentage);
+                    averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());
 
             try (FSDataInputStream mergedStats = hdfs
                     .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {