You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/13 09:26:20 UTC

incubator-kylin git commit: KYLIN-1137 add topN support in MergeCuboidFromStorageMapper for MR_V2

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1126 0cead28c3 -> 3d0334fe7


KYLIN-1137 add topN support in MergeCuboidFromStorageMapper for MR_V2

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3d0334fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3d0334fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3d0334fe

Branch: refs/heads/KYLIN-1126
Commit: 3d0334fe73cf922e5365007c295cd118d3f9b0e0
Parents: 0cead28
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 13 16:25:34 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 13 16:25:34 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/topn/Counter.java   |  4 -
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 69 +++++++++++++++
 .../engine/mr/steps/MergeCuboidMapper.java      | 89 +++++++++++---------
 3 files changed, 116 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 461e083..31c5ed1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -51,10 +51,6 @@ public class Counter<T> implements Externalizable {
         return item;
     }
     
-    public void setItem(T item) {
-        this.item = item;
-    }
-
     public double getCount() {
         return count;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 50f3d4c..286ff02 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -21,8 +21,14 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
@@ -45,6 +51,8 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -81,6 +89,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
     private MeasureCodec codec;
     private ByteArrayWritable outputValue = new ByteArrayWritable();
 
+    private List<MeasureDesc> measuresDescs;
+    private Integer[] measureIdxUsingDict;
+    
     private Boolean checkNeedMerging(TblColRef col) throws IOException {
         Boolean ret = dictsNeedMerging.get(col);
         if (ret != null)
@@ -119,7 +130,18 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
+        measuresDescs = cubeDesc.getMeasures();
         codec = new MeasureCodec(cubeDesc.getMeasures());
+        if (cubeDesc.hasMeasureUsingDictionary()) {
+            List<Integer> measuresUsingDict = Lists.newArrayList();
+            for (int i = 0; i < measuresDescs.size(); i++) {
+                if (measuresDescs.get(i).getFunction().isTopN()) {
+                    // so far only TopN uses dic
+                    measuresUsingDict.add(i);
+                }
+            }
+            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
+        }
     }
 
     @Override
@@ -191,6 +213,11 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
+        // encode measure if it uses dictionary 
+        if (cubeDesc.hasMeasureUsingDictionary()) {
+            reEncodeMeasure(value);
+        } 
+        
         valueBuf.clear();
         codec.encode(value, valueBuf);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -198,4 +225,46 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         context.write(outputKey, outputValue);
     }
 
+    private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
+        int bufOffset = 0;
+        for (int idx : measureIdxUsingDict) {
+            // only TopN measure uses dic
+            TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
+
+            MeasureDesc measureDesc = measuresDescs.get(idx);
+            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
+            if (StringUtils.isNotEmpty(displayCol)) {
+                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
+                TblColRef colRef = new TblColRef(sourceColumn);
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+                int topNSize = topNCounters.size();
+                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+                }
+
+                for (Counter<ByteArray> c : topNCounters) {
+                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                    int idInMergedDict;
+                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                    if (size < 0) {
+                        idInMergedDict = mergedDict.nullId();
+                    } else {
+                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
+                    }
+
+                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                    bufOffset += mergedDict.getSizeOfId();
+                }
+            }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index ac46b99..68d1481 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -234,52 +234,57 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         // encode measure if it uses dictionary 
         if (cubeDesc.hasMeasureUsingDictionary()) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            bufOffset = 0;
-            for (int idx : measureIdxUsingDict) {
-                // only TopN measure uses dic
-                TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
-
-                MeasureDesc measureDesc = measuresDescs.get(idx);
-                String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-                if (StringUtils.isNotEmpty(displayCol)) {
-                    ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
-                    TblColRef colRef = new TblColRef(sourceColumn);
-                    DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                    Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-                    Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
-
-                    int topNSize = topNCounters.size();
-                    while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                            mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                            mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                        byte[] oldBuf = newKeyBodyBuf;
-                        newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                        System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                    }
+            reEncodeMeasure(measureObjs);
+            valueBuf.clear();
+            codec.encode(measureObjs, valueBuf);
+            outputValue.set(valueBuf.array(), 0, valueBuf.position());
+            value = outputValue;
+        } 
+            
+        context.write(outputKey, value);
+    }
+
+    private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
+        int bufOffset = 0;
+        for (int idx : measureIdxUsingDict) {
+            // only TopN measure uses dic
+            TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
+
+            MeasureDesc measureDesc = measuresDescs.get(idx);
+            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
+            if (StringUtils.isNotEmpty(displayCol)) {
+                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
+                TblColRef colRef = new TblColRef(sourceColumn);
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+                int topNSize = topNCounters.size();
+                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+                }
 
-                    for (Counter<ByteArray> c : topNCounters) {
-                        int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                        int idInMergedDict;
-                        int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                        if (size < 0) {
-                            idInMergedDict = mergedDict.nullId();
-                        } else {
-                            idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                        }
-
-                        BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                        c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()));
-                        bufOffset += mergedDict.getSizeOfId();
+                for (Counter<ByteArray> c : topNCounters) {
+                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                    int idInMergedDict;
+                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                    if (size < 0) {
+                        idInMergedDict = mergedDict.nullId();
+                    } else {
+                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                     }
+
+                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                    bufOffset += mergedDict.getSizeOfId();
                 }
             }
-
-            valueBuf.clear();
-            codec.encode(measureObjs, valueBuf);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            context.write(outputKey, outputValue);
-        } else {
-            context.write(outputKey, value);
         }
+
     }
+    
 }