You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/11 12:58:46 UTC

incubator-kylin git commit: KYLIN-1137 TopN measure need support dictionary merge

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1126 8796fa37f -> 6476549c9


KYLIN-1137 	TopN measure need support dictionary merge

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6476549c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6476549c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6476549c

Branch: refs/heads/KYLIN-1126
Commit: 6476549c9690da8eeefa4fe6c0ee476c3d9bd561
Parents: 8796fa3
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 11 19:58:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 11 19:58:02 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/topn/Counter.java   |  4 +
 .../org/apache/kylin/cube/model/CubeDesc.java   | 10 +++
 .../engine/mr/steps/InMemCuboidMapper.java      | 26 ++-----
 .../engine/mr/steps/MergeCuboidMapper.java      | 79 +++++++++++++++++++-
 .../engine/mr/steps/MergeDictionaryStep.java    | 17 ++---
 5 files changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 2bca4df..461e083 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -50,6 +50,10 @@ public class Counter<T> implements Externalizable {
     public T getItem() {
         return item;
     }
+    
+    public void setItem(T item) {
+        this.item = item;
+    }
 
     public double getCount() {
         return count;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 160090a..2709247 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -847,4 +847,14 @@ public class CubeDesc extends RootPersistentEntity {
     public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
         return measureDisplayColumns;
     }
+
+
+    public boolean hasMeasureUsingDictionary() {
+        for (MeasureDesc measureDesc : this.getMeasures()) {
+            if (measureDesc.getFunction().isTopN())
+                return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 2bf627b..d724c76 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -65,26 +65,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
 
         Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
 
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                    Dictionary<?> dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        logger.warn("Dictionary for " + col + " was not found.");
-                    }
-
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-        
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isTopN()) {
-                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
-                TblColRef col = colRefs.get(colRefs.size() - 1);
-                dictionaryMap.put(col, cubeSegment.getDictionary(col));
+        // dictionary
+        for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+            Dictionary<?> dict = cubeSegment.getDictionary(col);
+            if (dict == null) {
+                logger.warn("Dictionary for " + col + " was not found.");
             }
+
+            dictionaryMap.put(col, cubeSegment.getDictionary(col));
         }
         
         DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 0b68e59..b3b38dc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,13 +19,18 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.SplittedBytes;
@@ -43,6 +48,9 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -68,6 +76,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
     private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+    private List<MeasureDesc> measuresDescs;
+    private MeasureCodec codec;
+    private Object[] measureObjs;
+    private Integer[] measureIdxUsingDict;
+    private ByteBuffer valueBuf;
+    private Text outputValue;
 
     private Boolean checkNeedMerging(TblColRef col) throws IOException {
         Boolean ret = dictsNeedMerging.get(col);
@@ -108,6 +122,22 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
+        
+        if (cubeDesc.hasMeasureUsingDictionary()) {
+            measuresDescs = cubeDesc.getMeasures();
+            codec = new MeasureCodec(measuresDescs);
+            measureObjs = new Object[measuresDescs.size()];
+            List<Integer> measuresUsingDict = Lists.newArrayList();
+            for (int i = 0; i < measuresDescs.size(); i++) {
+                if (measuresDescs.get(i).getFunction().isTopN()) {
+                    // so far only TopN uses dic
+                    measuresUsingDict.add(i);
+                }
+            }
+            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
+            valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            outputValue = new Text();
+        }
     }
 
     private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
@@ -200,6 +230,53 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        context.write(outputKey, value);
+        
+        // encode measure if it uses dictionary 
+        if (cubeDesc.hasMeasureUsingDictionary()) {
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+            bufOffset = 0;
+            for (int idx : measureIdxUsingDict) {
+                // only TopN measure uses dic
+                TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
+
+                MeasureDesc measureDesc = measuresDescs.get(idx);
+                String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn();
+                TblColRef col = cubeDesc.findColumnRef(cubeDesc.getFactTable(), displayCol);
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+                int topNSize = topNCounters.size();
+                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length  - bufOffset || //
+                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length  - bufOffset || //
+                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+                }
+
+                for (Counter<ByteArray> c : topNCounters) {
+                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                    int idInMergedDict;
+                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                    if (size < 0) {
+                        idInMergedDict = mergedDict.nullId();
+                    } else {
+                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
+                    }
+
+                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                    c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()));
+                    bufOffset += mergedDict.getSizeOfId();
+                }
+            }
+
+            valueBuf.clear();
+            codec.encode(measureObjs, valueBuf);
+            outputValue.set(valueBuf.array(), 0, valueBuf.position());
+            context.write(outputKey, outputValue);
+        } else {
+            context.write(outputKey, value);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 2db4ce7..b73fda4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -110,16 +110,13 @@ public class MergeDictionaryStep extends AbstractExecutable {
         DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
 
         CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
+
+        for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+            String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), "true", col).getTable();
+            if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+                colsNeedMeringDict.add(col);
+            } else {
+                colsNeedCopyDict.add(col);
             }
         }