You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/08 09:03:11 UTC

[kylin] 05/06: KYLIN-3423 Performance improvement in FactDistinctColumnsMapper

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d1ed1072728ca0bd12b83a04e2308784b2e045a6
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Tue Jun 26 02:07:42 2018 +0800

    KYLIN-3423 Performance improvement in FactDistinctColumnsMapper
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/cube/DimensionRangeInfo.java  |   8 ++
 .../engine/mr/steps/FactDistinctColumnsMapper.java | 127 +++++++++++++++++----
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  11 +-
 .../FactDistinctColumnsReducerMappingTest.java     |   4 +-
 .../kylin/engine/mr/steps/DictColDeduperTest.java  |  65 +++++++++++
 5 files changed, 185 insertions(+), 30 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index e36ca96..0b0d1c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -92,9 +92,17 @@ public class DimensionRangeInfo {
         return min;
     }
 
+    public void setMin(String min) {
+        this.min = min;
+    }
+
     public String getMax() {
         return max;
     }
+
+    public void setMax(String max) {
+        this.max = max;
+    }
     
     @Override
     public String toString() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index fc9dc65..7bffce7 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
@@ -37,9 +41,11 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
@@ -62,6 +68,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     private int rowCount = 0;
     private int samplingPercentage;
     private ByteBuffer tmpbuf;
+    
+    private DictColDeduper dictColDeduper;
+    private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
 
     private CuboidStatCalculator[] cuboidStatCalculators;
 
@@ -132,6 +141,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             cuboidStatCalculators[i] = calculator;
             calculator.start();
         }
+        
+        // setup dict col deduper
+        dictColDeduper = new DictColDeduper();
+        Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+        for (int i = 0; i < allCols.size(); i++) {
+            if (dictCols.contains(allCols.get(i)))
+                dictColDeduper.setIsDictCol(i);
+        }
     }
 
     private int getStatsThreadNum(int cuboidNum) {
@@ -156,41 +173,42 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
         for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < allDimDictCols.size(); i++) {
+            for (int i = 0; i < allCols.size(); i++) {
                 String fieldValue = row[columnIndex[i]];
                 if (fieldValue == null)
                     continue;
 
-                int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
-                
-                tmpbuf.clear();
-                byte[] valueBytes = Bytes.toBytes(fieldValue);
-                int size = valueBytes.length + 1;
-                if (size >= tmpbuf.capacity()) {
-                    tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
-                }
-                tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
-                tmpbuf.put(valueBytes);
-                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = allDimDictCols.get(i).getType();
-                sortableKey.init(outputKey, type);
-                context.write(sortableKey, EMPTY_TEXT);
-
-                // log a few rows for troubleshooting
-                if (rowCount < 10) {
-                    logger.info(
-                            "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                final DataType type = allCols.get(i).getType();
+
+                //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+                if (dictColDeduper.isDictCol(i)) {
+                    if (dictColDeduper.add(i, fieldValue)) {
+                        writeFieldValue(context, type, i, fieldValue);
+                    }
+                } else {
+                    DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                    if (old == null) {
+                        old = new DimensionRangeInfo(fieldValue, fieldValue);
+                        dimensionRangeInfoMap.put(i, old);
+                    } else {
+                        old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                        old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                    }
                 }
             }
 
             if (rowCount % 100 < samplingPercentage) {
                 putRowKeyToHLL(row);
             }
+            
+            if (rowCount % 100 == 0) {
+                dictColDeduper.resetIfShortOfMem();
+            }
 
             rowCount++;
         }
     }
-
+    
     private void putRowKeyToHLL(String[] row) {
         for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
             cuboidStatCalculator.putRow(row);
@@ -231,6 +249,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 context.write(sortableKey, outputValue);
             }
         }
+        for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+            DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+            DataType dataType = allCols.get(colIndex).getType();
+            writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
+            writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
+        }
     }
 
     private int countNewSize(int oldSize, int dataSize) {
@@ -241,6 +265,26 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         return newSize;
     }
 
+    private void writeFieldValue(Context context, DataType type, Integer colIndex, String value)
+            throws IOException, InterruptedException {
+        int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+        tmpbuf.clear();
+        byte[] valueBytes = Bytes.toBytes(value);
+        int size = valueBytes.length + 1;
+        if (size >= tmpbuf.capacity()) {
+            tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+        }
+        tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+        tmpbuf.put(valueBytes);
+        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+        sortableKey.init(outputKey, type);
+        context.write(sortableKey, EMPTY_TEXT);
+        // log a few rows for troubleshooting
+        if (rowCount < 10) {
+            logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+        }
+    }
+
     public static class CuboidStatCalculator implements Runnable {
         private final int id;
         private final int nRowKey;
@@ -371,4 +415,45 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             }
         }
     }
+    
+    public static class DictColDeduper {
+
+        final boolean enabled;
+        final int resetThresholdMB;
+        final Map<Integer, Set<String>> colValueSets = Maps.newHashMap();
+        
+        public DictColDeduper() {
+            this(200, 100);
+        }
+        
+        public DictColDeduper(int enableThresholdMB, int resetThresholdMB) {
+            // only enable when there is sufficient memory
+            this.enabled = MemoryBudgetController.getSystemAvailMB() >= enableThresholdMB;
+            this.resetThresholdMB = resetThresholdMB;
+        }
+        
+        public void setIsDictCol(int i) {
+            colValueSets.put(i, new HashSet<String>());
+        }
+        
+        public boolean isDictCol(int i) {
+            return colValueSets.containsKey(i);
+        }
+
+        public boolean add(int i, String fieldValue) {
+            return colValueSets.get(i).add(fieldValue);
+        }
+        
+        public Set<String> getValueSet(int i) {
+            return colValueSets.get(i);
+        }
+
+        public void resetIfShortOfMem() {
+            if (MemoryBudgetController.getSystemAvailMB() < resetThresholdMB) {
+                for (Set<String> set : colValueSets.values())
+                    set.clear();
+            }
+        }
+        
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index ceddeb5..ad9030c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -49,10 +49,9 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
     protected IMRTableInputFormat flatTableInputFormat;
-    protected List<TblColRef> allDimDictCols;
+    protected List<TblColRef> allCols;
 
     protected Text outputKey = new Text();
-    //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
     protected Text outputValue = new Text();
     protected int errorRecordCounter = 0;
 
@@ -73,14 +72,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         reducerMapping = new FactDistinctColumnsReducerMapping(cube);
-        allDimDictCols = reducerMapping.getAllDimDictCols();
+        allCols = reducerMapping.getAllDimDictCols();
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
-        columnIndex = new int[allDimDictCols.size()];
-        for (int i = 0; i < allDimDictCols.size(); i++) {
-            TblColRef colRef = allDimDictCols.get(i);
+        columnIndex = new int[allCols.size()];
+        for (int i = 0; i < allCols.size(); i++) {
+            TblColRef colRef = allCols.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
             columnIndex[i] = columnIndexOnFlatTbl;
         }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index 3c58b26..9a00d55 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -54,9 +54,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         TblColRef aUHC = cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP");
 
         FactDistinctColumnsReducerMapping mapping = new FactDistinctColumnsReducerMapping(cube);
-        //System.out.println(mapping.getAllDictCols());
-        //System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers()));
-
+        
         int totalReducerNum = mapping.getTotalReducerNum();
         Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
         
diff --git a/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
new file mode 100644
index 0000000..1634843
--- /dev/null
+++ b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class DictColDeduperTest {
+
+    @Test
+    public void testBasics() {
+        DictColDeduper dd = new DictColDeduper(50, 0);
+        
+        dd.setIsDictCol(0);
+        dd.setIsDictCol(2);
+        
+        Assert.assertTrue(dd.isDictCol(0));
+        Assert.assertTrue(!dd.isDictCol(1));
+        Assert.assertTrue(dd.isDictCol(2));
+        Assert.assertTrue(!dd.isDictCol(3));
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        dd.resetIfShortOfMem();
+        Assert.assertTrue(!dd.add(0, "abc"));
+        
+        Assert.assertTrue(dd.add(2, "abc"));
+        dd.resetIfShortOfMem();
+        Assert.assertTrue(!dd.add(2, "abc"));
+    }
+    
+    @Test
+    public void testReset() {
+        DictColDeduper dd = new DictColDeduper(50, Integer.MAX_VALUE);
+        
+        dd.setIsDictCol(0);
+        dd.setIsDictCol(2);
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        Assert.assertTrue(dd.add(2, "abc"));
+        
+        dd.resetIfShortOfMem();
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        Assert.assertTrue(dd.add(2, "abc"));
+    }
+}