You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/30 07:31:58 UTC

[kylin] branch 2.6.x updated: KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 0add42c  KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java
0add42c is described below

commit 0add42c5e6862ef8edca4b08aed1fc390a246b9d
Author: chao long <wa...@qq.com>
AuthorDate: Wed Mar 27 19:36:59 2019 +0800

    KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java
---
 .../kylin/engine/spark/SparkFactDistinct.java      | 120 +++++++++++----------
 1 file changed, 65 insertions(+), 55 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index edbc307..53b8a4d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -193,12 +192,23 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
 
             final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
 
+            // read record from flat table
+            // output:
+            //   1, statistic
+            //   2, field value of dict col
+            //   3, min/max field value of not dict col
             JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
                     new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
 
+            // repartition data, make each reducer handle only one col data or the statistic data
             JavaPairRDD<SelfDefineSortableKey, Text> aggredRDD = flatOutputRDD
                 .repartitionAndSortWithinPartitions(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
 
+            // multiple output result
+            // 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
+            // 2, CFG_OUTPUT_DICT: dictionary object built in reducer
+            // 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
+            // 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
             JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
                     .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
 
@@ -237,6 +247,12 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
         }
     }
 
+    /**
+     * output: Tuple2<SelfDefineSortableKey, Text>
+     * 1, for statistics, SelfDefineSortableKey = reducerId + cuboidId, Text = hll of cuboidId
+     * 2, for dict col, SelfDefineSortableKey = reducerId + field value, Text = ""
+     * 3, for not dict col, SelfDefineSortableKey = reducerId + min/max value, Text = ""
+     */
     static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
         private transient volatile boolean initialized = false;
         private String cubeName;
@@ -655,6 +671,8 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
         private CubeDesc cubeDesc;
         private String maxValue = null;
         private String minValue = null;
+        private boolean isDimensionCol;
+        private boolean isDictCol;
         private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
 
         public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf,
@@ -690,6 +708,9 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
                     col = reducerMapping.getColForReducer(taskId);
                     Preconditions.checkNotNull(col);
 
+                    isDimensionCol = cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare();
+                    isDictCol = cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col);
+
                     // local build dict
                     buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
                     if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
@@ -727,33 +748,12 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
                     }
                 }
             }
-            SelfDefineSortableKey prevKey = null;
-            List<Text> values = new ArrayList<Text>();
-            while (tuple2Iterator.hasNext()) {
-                Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
-                if (prevKey != null) {
-                    int cmp = tuple._1.compareTo(prevKey);
-                    // check
-                    if (cmp < 0) {
-                        throw new IOException(" key must be sorted. prevKey: " + prevKey + " current: " + tuple._1);
-                    } else if (cmp > 0) {
-                        processRow(prevKey.getText(), values);
-                        prevKey = null;
-                        values.clear();
-                    }
-                }
-                prevKey = tuple._1;
-                values.add(tuple._2);
-            }
-
-            if (prevKey != null || values.size() > 0) {
-                processRow(prevKey.getText(), values);
-                prevKey = null;
-                values.clear();
-            }
 
             if (isStatistics) {
-                //output the hll info
+                // calculate hll
+                calculateStatistics(tuple2Iterator);
+
+                // output the hll info
                 List<Long> allCuboids = Lists.newArrayList();
                 allCuboids.addAll(cuboidHLLMap.keySet());
                 Collections.sort(allCuboids);
@@ -761,11 +761,15 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
                 logMapperAndCuboidStatistics(allCuboids); // for human check
                 outputStatistics(allCuboids, result);
             } else {
-                //dimension col
-                if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                // calculate dict/dimRange/
+                calculateColData(tuple2Iterator);
+
+                // output dim range
+                if (isDimensionCol) {
                     outputDimRangeInfo(result);
                 }
-                // dic col
+
+                // output dict object
                 if (buildDictInReducer) {
                     Dictionary<String> dict = builder.build();
                     outputDict(col, dict, result);
@@ -775,33 +779,40 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
             return result.iterator();
         }
 
-        private void processRow(Text key, List<Text> values) throws IOException {
-            if (isStatistics) {
-                // for hll
-                long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+        private void calculateStatistics(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) throws IOException {
+            while (tuple2Iterator.hasNext()) {
+                HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
 
-                for (Text value : values) {
-                    HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
-                    ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
-                    hll.readRegisters(bf);
+                Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
+                long cuboidId = Bytes.toLong(tuple._1.getText().getBytes(), 1);
 
-                    totalRowsBeforeMerge += hll.getCountEstimate();
+                ByteBuffer bf = ByteBuffer.wrap(tuple._2.getBytes(), 0, tuple._2.getLength());
+                hll.readRegisters(bf);
 
-                    if (cuboidId == baseCuboidId) {
-                        baseCuboidRowCountInMappers.add(hll.getCountEstimate());
-                    }
+                totalRowsBeforeMerge += hll.getCountEstimate();
 
-                    if (cuboidHLLMap.get(cuboidId) != null) {
-                        cuboidHLLMap.get(cuboidId).merge(hll);
-                    } else {
-                        cuboidHLLMap.put(cuboidId, hll);
-                    }
+                if (cuboidId == baseCuboidId) {
+                    baseCuboidRowCountInMappers.add(hll.getCountEstimate());
                 }
-            } else {
-                String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+
+                if (cuboidHLLMap.get(cuboidId) != null) {
+                    cuboidHLLMap.get(cuboidId).merge(hll);
+                } else {
+                    cuboidHLLMap.put(cuboidId, hll);
+                }
+            }
+        }
+
+        private void calculateColData(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) {
+            while (tuple2Iterator.hasNext()) {
+                Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
+
+                String value = Bytes.toString(tuple._1.getText().getBytes(), 1, tuple._1.getText().getLength() - 1);
                 logAFewRows(value);
+
                 // if dimension col, compute max/min value
-                if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare()) {
+                // include the col which is both dict col and dim col
+                if (isDimensionCol) {
                     if (minValue == null || col.getType().compare(minValue, value) > 0) {
                         minValue = value;
                     }
@@ -811,20 +822,19 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
                 }
 
                 //if dict column
-                if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (isDictCol) {
                     if (buildDictInReducer) {
                         builder.addValue(value);
                     } else {
-                        byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
                         // output written to baseDir/colName/-r-00000 (etc)
-                        String fileName = col.getIdentity() + "/";
                         result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
-                            BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
-                            NullWritable.get(), new Text(keyBytes), fileName)));
+                                BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
+                                NullWritable.get(), new Text(value.getBytes(StandardCharsets.UTF_8)), col.getIdentity() + "/")));
                     }
                 }
+
+                rowCount++;
             }
-            rowCount++;
         }
 
         private void logMapperAndCuboidStatistics(List<Long> allCuboids) {