You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/08 09:03:11 UTC
[kylin] 05/06: KYLIN-3423 Performance improvement in
FactDistinctColumnsMapper
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d1ed1072728ca0bd12b83a04e2308784b2e045a6
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Tue Jun 26 02:07:42 2018 +0800
KYLIN-3423 Performance improvement in FactDistinctColumnsMapper
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../org/apache/kylin/cube/DimensionRangeInfo.java | 8 ++
.../engine/mr/steps/FactDistinctColumnsMapper.java | 127 +++++++++++++++++----
.../mr/steps/FactDistinctColumnsMapperBase.java | 11 +-
.../FactDistinctColumnsReducerMappingTest.java | 4 +-
.../kylin/engine/mr/steps/DictColDeduperTest.java | 65 +++++++++++
5 files changed, 185 insertions(+), 30 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index e36ca96..0b0d1c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -92,9 +92,17 @@ public class DimensionRangeInfo {
return min;
}
+ public void setMin(String min) {
+ this.min = min;
+ }
+
public String getMax() {
return max;
}
+
+ public void setMax(String max) {
+ this.max = max;
+ }
@Override
public String toString() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index fc9dc65..7bffce7 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -29,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
@@ -37,9 +41,11 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
@@ -62,6 +68,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private int rowCount = 0;
private int samplingPercentage;
private ByteBuffer tmpbuf;
+
+ private DictColDeduper dictColDeduper;
+ private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
private CuboidStatCalculator[] cuboidStatCalculators;
@@ -132,6 +141,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
cuboidStatCalculators[i] = calculator;
calculator.start();
}
+
+ // setup dict col deduper
+ dictColDeduper = new DictColDeduper();
+ Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+ for (int i = 0; i < allCols.size(); i++) {
+ if (dictCols.contains(allCols.get(i)))
+ dictColDeduper.setIsDictCol(i);
+ }
}
private int getStatsThreadNum(int cuboidNum) {
@@ -156,41 +173,42 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
for (String[] row : rowCollection) {
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
- for (int i = 0; i < allDimDictCols.size(); i++) {
+ for (int i = 0; i < allCols.size(); i++) {
String fieldValue = row[columnIndex[i]];
if (fieldValue == null)
continue;
- int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
-
- tmpbuf.clear();
- byte[] valueBytes = Bytes.toBytes(fieldValue);
- int size = valueBytes.length + 1;
- if (size >= tmpbuf.capacity()) {
- tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
- }
- tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
- tmpbuf.put(valueBytes);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- DataType type = allDimDictCols.get(i).getType();
- sortableKey.init(outputKey, type);
- context.write(sortableKey, EMPTY_TEXT);
-
- // log a few rows for troubleshooting
- if (rowCount < 10) {
- logger.info(
- "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+ final DataType type = allCols.get(i).getType();
+
+ //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+ if (dictColDeduper.isDictCol(i)) {
+ if (dictColDeduper.add(i, fieldValue)) {
+ writeFieldValue(context, type, i, fieldValue);
+ }
+ } else {
+ DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+ if (old == null) {
+ old = new DimensionRangeInfo(fieldValue, fieldValue);
+ dimensionRangeInfoMap.put(i, old);
+ } else {
+ old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+ old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+ }
}
}
if (rowCount % 100 < samplingPercentage) {
putRowKeyToHLL(row);
}
+
+ if (rowCount % 100 == 0) {
+ dictColDeduper.resetIfShortOfMem();
+ }
rowCount++;
}
}
-
+
private void putRowKeyToHLL(String[] row) {
for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
cuboidStatCalculator.putRow(row);
@@ -231,6 +249,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
context.write(sortableKey, outputValue);
}
}
+ for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+ DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+ DataType dataType = allCols.get(colIndex).getType();
+ writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
+ writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
+ }
}
private int countNewSize(int oldSize, int dataSize) {
@@ -241,6 +265,26 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
return newSize;
}
+ private void writeFieldValue(Context context, DataType type, Integer colIndex, String value)
+ throws IOException, InterruptedException {
+ int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+ tmpbuf.clear();
+ byte[] valueBytes = Bytes.toBytes(value);
+ int size = valueBytes.length + 1;
+ if (size >= tmpbuf.capacity()) {
+ tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+ }
+ tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+ tmpbuf.put(valueBytes);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.init(outputKey, type);
+ context.write(sortableKey, EMPTY_TEXT);
+ // log a few rows for troubleshooting
+ if (rowCount < 10) {
+ logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+ }
+ }
+
public static class CuboidStatCalculator implements Runnable {
private final int id;
private final int nRowKey;
@@ -371,4 +415,45 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
}
}
+
+ public static class DictColDeduper {
+
+ final boolean enabled;
+ final int resetThresholdMB;
+ final Map<Integer, Set<String>> colValueSets = Maps.newHashMap();
+
+ public DictColDeduper() {
+ this(200, 100);
+ }
+
+ public DictColDeduper(int enableThresholdMB, int resetThresholdMB) {
+ // only enable when there is sufficient memory
+ this.enabled = MemoryBudgetController.getSystemAvailMB() >= enableThresholdMB;
+ this.resetThresholdMB = resetThresholdMB;
+ }
+
+ public void setIsDictCol(int i) {
+ colValueSets.put(i, new HashSet<String>());
+ }
+
+ public boolean isDictCol(int i) {
+ return colValueSets.containsKey(i);
+ }
+
+ public boolean add(int i, String fieldValue) {
+ return colValueSets.get(i).add(fieldValue);
+ }
+
+ public Set<String> getValueSet(int i) {
+ return colValueSets.get(i);
+ }
+
+ public void resetIfShortOfMem() {
+ if (MemoryBudgetController.getSystemAvailMB() < resetThresholdMB) {
+ for (Set<String> set : colValueSets.values())
+ set.clear();
+ }
+ }
+
+ }
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index ceddeb5..ad9030c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -49,10 +49,9 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
protected CubeDesc cubeDesc;
protected long baseCuboidId;
protected IMRTableInputFormat flatTableInputFormat;
- protected List<TblColRef> allDimDictCols;
+ protected List<TblColRef> allCols;
protected Text outputKey = new Text();
- //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
protected Text outputValue = new Text();
protected int errorRecordCounter = 0;
@@ -73,14 +72,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
reducerMapping = new FactDistinctColumnsReducerMapping(cube);
- allDimDictCols = reducerMapping.getAllDimDictCols();
+ allCols = reducerMapping.getAllDimDictCols();
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
- columnIndex = new int[allDimDictCols.size()];
- for (int i = 0; i < allDimDictCols.size(); i++) {
- TblColRef colRef = allDimDictCols.get(i);
+ columnIndex = new int[allCols.size()];
+ for (int i = 0; i < allCols.size(); i++) {
+ TblColRef colRef = allCols.get(i);
int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
columnIndex[i] = columnIndexOnFlatTbl;
}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index 3c58b26..9a00d55 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -54,9 +54,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
TblColRef aUHC = cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP");
FactDistinctColumnsReducerMapping mapping = new FactDistinctColumnsReducerMapping(cube);
- //System.out.println(mapping.getAllDictCols());
- //System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers()));
-
+
int totalReducerNum = mapping.getTotalReducerNum();
Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
diff --git a/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
new file mode 100644
index 0000000..1634843
--- /dev/null
+++ b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class DictColDeduperTest {
+
+ @Test
+ public void testBasics() {
+ DictColDeduper dd = new DictColDeduper(50, 0);
+
+ dd.setIsDictCol(0);
+ dd.setIsDictCol(2);
+
+ Assert.assertTrue(dd.isDictCol(0));
+ Assert.assertTrue(!dd.isDictCol(1));
+ Assert.assertTrue(dd.isDictCol(2));
+ Assert.assertTrue(!dd.isDictCol(3));
+
+ Assert.assertTrue(dd.add(0, "abc"));
+ dd.resetIfShortOfMem();
+ Assert.assertTrue(!dd.add(0, "abc"));
+
+ Assert.assertTrue(dd.add(2, "abc"));
+ dd.resetIfShortOfMem();
+ Assert.assertTrue(!dd.add(2, "abc"));
+ }
+
+ @Test
+ public void testReset() {
+ DictColDeduper dd = new DictColDeduper(50, Integer.MAX_VALUE);
+
+ dd.setIsDictCol(0);
+ dd.setIsDictCol(2);
+
+ Assert.assertTrue(dd.add(0, "abc"));
+ Assert.assertTrue(dd.add(2, "abc"));
+
+ dd.resetIfShortOfMem();
+
+ Assert.assertTrue(dd.add(0, "abc"));
+ Assert.assertTrue(dd.add(2, "abc"));
+ }
+}