You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:08 UTC

[13/13] incubator-kylin git commit: KYLIN-976 extract MeasureIngester.reEncodeDictionary()

KYLIN-976 extract MeasureIngester.reEncodeDictionary()


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dac24880
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dac24880
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dac24880

Branch: refs/heads/KYLIN-976
Commit: dac248804b331f491175e06fd9f598235b2c9e6f
Parents: ce61309
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Nov 27 16:59:51 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 16:59:51 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/measure/MeasureIngester.java   |  2 +
 .../org/apache/kylin/measure/MeasureType.java   |  3 -
 .../kylin/measure/basic/BasicMeasureType.java   |  7 --
 .../kylin/measure/basic/BigDecimalIngester.java |  6 ++
 .../kylin/measure/basic/DoubleIngester.java     |  6 ++
 .../kylin/measure/basic/LongIngester.java       |  6 ++
 .../kylin/measure/hllc/HLLCMeasureType.java     | 11 ++-
 .../kylin/measure/topn/TopNMeasureType.java     | 43 +++++++++---
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 68 +++++++------------
 .../engine/mr/steps/MergeCuboidMapper.java      | 71 +++++++-------------
 10 files changed, 107 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 8d6e601..9c7b406 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -41,4 +41,6 @@ abstract public class MeasureIngester<V> {
     }
 
     abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+    
+    abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 604a7b6..4fe59c0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -21,7 +21,6 @@ package org.apache.kylin.measure;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.measure.basic.BasicMeasureFactory;
 import org.apache.kylin.measure.hllc.HLLCAggregationFactory;
 import org.apache.kylin.measure.topn.TopNMeasureFactory;
@@ -86,8 +85,6 @@ abstract public class MeasureType {
  
     abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
     
-    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
-
     /* ============================================================================
      * Cube Selection
      * ---------------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f6bf090..fe53bab 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -20,7 +20,6 @@ package org.apache.kylin.measure.basic;
 
 import java.util.List;
 
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
@@ -126,10 +125,4 @@ public class BasicMeasureType extends MeasureType {
         return null;
     }
 
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index bb743d6..ea1495c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
 
+    @Override
     public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         if (values.length > 1)
             throw new IllegalArgumentException();
@@ -37,4 +38,9 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
         else
             return new BigDecimal(values[0]);
     }
+
+    @Override
+    public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index 506ed19..aaa754a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -31,6 +31,7 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
     // avoid repeated object creation
     private DoubleMutable current = new DoubleMutable();
 
+    @Override
     public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         if (values.length > 1)
             throw new IllegalArgumentException();
@@ -42,4 +43,9 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
             l.set(Double.parseDouble(values[0]));
         return l;
     }
+
+    @Override
+    public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index 5bf1257..bdc1704 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -31,6 +31,7 @@ public class LongIngester extends MeasureIngester<LongMutable> {
     // avoid repeated object creation
     private LongMutable current = new LongMutable();
 
+    @Override
     public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         if (values.length > 1)
             throw new IllegalArgumentException();
@@ -42,4 +43,9 @@ public class LongIngester extends MeasureIngester<LongMutable> {
             l.set(Long.parseLong(values[0]));
         return l;
     }
+
+    @Override
+    public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index ee90818..2ad7630 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -74,6 +74,11 @@ public class HLLCMeasureType extends MeasureType {
                     hllc.add(v == null ? "__nUlL__" : v);
                 return hllc;
             }
+
+            @Override
+            public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 
@@ -91,10 +96,4 @@ public class HLLCMeasureType extends MeasureType {
         return null;
     }
 
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 1d2c87b..561f9f1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -18,9 +18,11 @@
 
 package org.apache.kylin.measure.topn;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
@@ -88,6 +90,37 @@ public class TopNMeasureType extends MeasureType {
                 topNCounter.offer(key, counter);
                 return topNCounter;
             }
+            
+            @SuppressWarnings("unchecked")
+            @Override
+            public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+                TopNCounter<ByteArray> topNCounter = (TopNCounter<ByteArray>) value;
+
+                TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+                Dictionary<String> sourceDict = oldDicts.get(colRef);
+                Dictionary<String> mergedDict = newDicts.get(colRef);
+
+                int topNSize = topNCounter.size();
+                byte[] newIdBuf = new byte[topNSize * mergedDict.getSizeOfId()];
+                byte[] literal = new byte[sourceDict.getSizeOfValue()];
+
+                int bufOffset = 0;
+                for (Counter<ByteArray> c : topNCounter) {
+                    int oldId = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                    int newId;
+                    int size = sourceDict.getValueBytesFromId(oldId, literal, 0);
+                    if (size < 0) {
+                        newId = mergedDict.nullId();
+                    } else {
+                        newId = mergedDict.getIdFromValueBytes(literal, 0, size);
+                    }
+
+                    BytesUtil.writeUnsigned(newId, newIdBuf, bufOffset, mergedDict.getSizeOfId());
+                    c.getItem().set(newIdBuf, bufOffset, mergedDict.getSizeOfId());
+                    bufOffset += mergedDict.getSizeOfId();
+                }
+                return value;
+            }
         };
     }
 
@@ -98,14 +131,8 @@ public class TopNMeasureType extends MeasureType {
 
     @Override
     public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
+        TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(1);
+        return Collections.singletonList(literalCol);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index bc1c883..b4682dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -22,10 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
@@ -48,6 +47,8 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -60,6 +61,7 @@ import com.google.common.collect.Lists;
 /**
  * @author shaoshi
  */
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
 
     private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
@@ -87,7 +89,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
     private MeasureCodec codec;
     private ByteArrayWritable outputValue = new ByteArrayWritable();
 
-    private List<Integer> topNMeasureIdx;
+    private List<Pair<Integer, MeasureIngester>> dictMeasures;
+    private Map<TblColRef, Dictionary<String>> oldDicts;
+    private Map<TblColRef, Dictionary<String>> newDicts;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -115,12 +119,18 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         measureDescs = cubeDesc.getMeasures();
         codec = new MeasureCodec(measureDescs);
 
-        topNMeasureIdx = Lists.newArrayList();
+        dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
-            if (measureDescs.get(i).getFunction().isTopN()) {
-                topNMeasureIdx.add(i);
+            MeasureDesc measureDesc = measureDescs.get(i);
+            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
+                dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }
         }
+        if (dictMeasures.size() > 0) {
+            oldDicts = sourceCubeSegment.buildDictionaryMap();
+            newDicts = mergedCubeSegment.buildDictionaryMap();
+        }
     }
 
     @Override
@@ -193,7 +203,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
         // re-encode measures if dictionary is used
-        reEncodeMeasure(value);
+        if (dictMeasures.size() > 0) {
+            reEncodeMeasure(value);
+        }
 
         valueBuf.clear();
         codec.encode(value, valueBuf);
@@ -217,45 +229,11 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         }
     }
 
-    @SuppressWarnings("unchecked")
     private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
-        // currently only topN uses dictionary in measure obj
-        if (topNMeasureIdx.isEmpty())
-            return;
-
-        int bufOffset = 0;
-        for (int idx : topNMeasureIdx) {
-            TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
-
-            MeasureDesc measureDesc = measureDescs.get(idx);
-            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
-            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
-
-            int topNSize = topNCounters.size();
-            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                byte[] oldBuf = newKeyBodyBuf;
-                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-            }
-
-            for (Counter<ByteArray> c : topNCounters) {
-                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                int idInMergedDict;
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            }
+        for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+            int i = pair.getFirst();
+            MeasureIngester ingester = pair.getSecond();
+            measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index c0277b5..4fc7236 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -22,17 +22,17 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -48,6 +48,8 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -57,6 +59,7 @@ import com.google.common.collect.Lists;
 /**
  * @author ysong1, honma
  */
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     private KylinConfig config;
@@ -78,7 +81,9 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
 
     // for re-encode measures that use dictionary
-    private List<Integer> topNMeasureIdx;
+    private List<Pair<Integer, MeasureIngester>> dictMeasures;
+    private Map<TblColRef, Dictionary<String>> oldDicts;
+    private Map<TblColRef, Dictionary<String>> newDicts;
     private List<MeasureDesc> measureDescs;
     private MeasureCodec codec;
     private Object[] measureObjs;
@@ -115,12 +120,19 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         measureObjs = new Object[measureDescs.size()];
         valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         outputValue = new Text();
-        topNMeasureIdx = Lists.newArrayList();
+        
+        dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
-            if (measureDescs.get(i).getFunction().isTopN()) {
-                topNMeasureIdx.add(i);
+            MeasureDesc measureDesc = measureDescs.get(i);
+            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
+                dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }
         }
+        if (dictMeasures.size() > 0) {
+            oldDicts = sourceCubeSegment.buildDictionaryMap();
+            newDicts = mergedCubeSegment.buildDictionaryMap();
+        }
     }
 
     private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
@@ -214,9 +226,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
         // re-encode measures if dictionary is used
-        if (topNMeasureIdx.size() > 0) {
+        if (dictMeasures.size() > 0) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            reEncodeMeasure(measureObjs);
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+            }
             valueBuf.clear();
             codec.encode(measureObjs, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -240,43 +256,4 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             return ret;
         }
     }
-
-    @SuppressWarnings("unchecked")
-    private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
-        int bufOffset = 0;
-        for (int idx : topNMeasureIdx) {
-            // only TopN measure uses dic
-            TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
-
-            MeasureDesc measureDesc = measureDescs.get(idx);
-            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
-            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
-
-            int topNSize = topNCounters.size();
-            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                byte[] oldBuf = newKeyBodyBuf;
-                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-            }
-
-            for (Counter<ByteArray> c : topNCounters) {
-                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                int idInMergedDict;
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            }
-        }
-    }
 }