You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/30 07:31:58 UTC
[kylin] branch 2.6.x updated: KYLIN-3873 Fix inappropriate use of
memory in SparkFactDistinct.java
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 0add42c KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java
0add42c is described below
commit 0add42c5e6862ef8edca4b08aed1fc390a246b9d
Author: chao long <wa...@qq.com>
AuthorDate: Wed Mar 27 19:36:59 2019 +0800
KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java
---
.../kylin/engine/spark/SparkFactDistinct.java | 120 +++++++++++----------
1 file changed, 65 insertions(+), 55 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index edbc307..53b8a4d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -193,12 +192,23 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+ // read record from flat table
+ // output:
+ // 1, statistic
+ // 2, field value of dict col
+ // 3, min/max field value of not dict col
JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+ // repartition data, make each reducer handle only one col data or the statistic data
JavaPairRDD<SelfDefineSortableKey, Text> aggredRDD = flatOutputRDD
.repartitionAndSortWithinPartitions(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+ // multiple output result
+ // 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
+ // 2, CFG_OUTPUT_DICT: dictionary object built in reducer
+ // 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
+ // 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
@@ -237,6 +247,12 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
}
}
+ /**
+ * output: Tuple2<SelfDefineSortableKey, Text>
+ * 1, for statistics, SelfDefineSortableKey = reducerId + cuboidId, Text = hll of cuboidId
+ * 2, for dict col, SelfDefineSortableKey = reducerId + field value, Text = ""
+ * 3, for not dict col, SelfDefineSortableKey = reducerId + min/max value, Text = ""
+ */
static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
private transient volatile boolean initialized = false;
private String cubeName;
@@ -655,6 +671,8 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
private CubeDesc cubeDesc;
private String maxValue = null;
private String minValue = null;
+ private boolean isDimensionCol;
+ private boolean isDictCol;
private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf,
@@ -690,6 +708,9 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
col = reducerMapping.getColForReducer(taskId);
Preconditions.checkNotNull(col);
+ isDimensionCol = cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare();
+ isDictCol = cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col);
+
// local build dict
buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
@@ -727,33 +748,12 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
}
}
}
- SelfDefineSortableKey prevKey = null;
- List<Text> values = new ArrayList<Text>();
- while (tuple2Iterator.hasNext()) {
- Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
- if (prevKey != null) {
- int cmp = tuple._1.compareTo(prevKey);
- // check
- if (cmp < 0) {
- throw new IOException(" key must be sorted. prevKey: " + prevKey + " current: " + tuple._1);
- } else if (cmp > 0) {
- processRow(prevKey.getText(), values);
- prevKey = null;
- values.clear();
- }
- }
- prevKey = tuple._1;
- values.add(tuple._2);
- }
-
- if (prevKey != null || values.size() > 0) {
- processRow(prevKey.getText(), values);
- prevKey = null;
- values.clear();
- }
if (isStatistics) {
- //output the hll info
+ // calculate hll
+ calculateStatistics(tuple2Iterator);
+
+ // output the hll info
List<Long> allCuboids = Lists.newArrayList();
allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
@@ -761,11 +761,15 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
logMapperAndCuboidStatistics(allCuboids); // for human check
outputStatistics(allCuboids, result);
} else {
- //dimension col
- if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ // calculate dict/dimRange/
+ calculateColData(tuple2Iterator);
+
+ // output dim range
+ if (isDimensionCol) {
outputDimRangeInfo(result);
}
- // dic col
+
+ // output dict object
if (buildDictInReducer) {
Dictionary<String> dict = builder.build();
outputDict(col, dict, result);
@@ -775,33 +779,40 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
return result.iterator();
}
- private void processRow(Text key, List<Text> values) throws IOException {
- if (isStatistics) {
- // for hll
- long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+ private void calculateStatistics(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) throws IOException {
+ while (tuple2Iterator.hasNext()) {
+ HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
- for (Text value : values) {
- HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
- ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
- hll.readRegisters(bf);
+ Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
+ long cuboidId = Bytes.toLong(tuple._1.getText().getBytes(), 1);
- totalRowsBeforeMerge += hll.getCountEstimate();
+ ByteBuffer bf = ByteBuffer.wrap(tuple._2.getBytes(), 0, tuple._2.getLength());
+ hll.readRegisters(bf);
- if (cuboidId == baseCuboidId) {
- baseCuboidRowCountInMappers.add(hll.getCountEstimate());
- }
+ totalRowsBeforeMerge += hll.getCountEstimate();
- if (cuboidHLLMap.get(cuboidId) != null) {
- cuboidHLLMap.get(cuboidId).merge(hll);
- } else {
- cuboidHLLMap.put(cuboidId, hll);
- }
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
}
- } else {
- String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+ }
+
+ private void calculateColData(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) {
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
+
+ String value = Bytes.toString(tuple._1.getText().getBytes(), 1, tuple._1.getText().getLength() - 1);
logAFewRows(value);
+
// if dimension col, compute max/min value
- if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare()) {
+ // include the col which is both dict col and dim col
+ if (isDimensionCol) {
if (minValue == null || col.getType().compare(minValue, value) > 0) {
minValue = value;
}
@@ -811,20 +822,19 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
}
//if dict column
- if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (isDictCol) {
if (buildDictInReducer) {
builder.addValue(value);
} else {
- byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
// output written to baseDir/colName/-r-00000 (etc)
- String fileName = col.getIdentity() + "/";
result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
- BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
- NullWritable.get(), new Text(keyBytes), fileName)));
+ BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
+ NullWritable.get(), new Text(value.getBytes(StandardCharsets.UTF_8)), col.getIdentity() + "/")));
}
}
+
+ rowCount++;
}
- rowCount++;
}
private void logMapperAndCuboidStatistics(List<Long> allCuboids) {